私はpysparkにはかなり新しく、csvファイルとして保存される大きなデータセットを処理するためにこのファイルを使用しようとしています。 私はCSVファイルをsparkデータフレームに読み込み、いくつかのカラムを削除して、新しいカラムを追加したいと思います。 どうすればいいですか?PySpark CSVをDataframeに読み込んで操作する方法
このデータをデータフレームに取り込む際に問題があります。これは、削減ステップでエラーTypeError: 'JavaPackage' object is not callable
を生成
def make_dataframe(data_portion, schema, sql):
fields = data_portion.split(",")
return sql.createDateFrame([(fields[0], fields[1])], schema=schema)
if __name__ == "__main__":
sc = SparkContext(appName="Test")
sql = SQLContext(sc)
...
big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
.reduce(lambda a, b: a.union(b))
big_frame.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<...>") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("append") \
.save()
sc.stop()
:これは私がこれまで持っているもののストリップダウンバージョンです。
これは可能ですか?データフレームを減らすというアイデアは、得られたデータをデータベース(Redshift、spark-redshiftパッケージを使用して)に書き込むことができるようにすることです。
unionAll()
とmap()
をpartial()
と一緒に使ってみましたが、動作させることはできません。
私はAmazonのEMRで、spark-redshift_2.10:2.0.0
、AmazonのJDBCドライバRedshiftJDBC41-1.1.17.1017.jar
を使っています。
入力(CSVフィールド)とは何ですか、出力は何ですか?コードを再設計する必要があるかもしれません。私は、データフレームを作成してそれらを結合する必要性を避けることができます.... – Yaron
@ Yaron csvは、データフレームのスキーマに配置できる単なる数字です。私はこれをRDDとして簡単かつ効率的に保存できることを知っていますが、それを行うと、私はそれが究極の目標である(私が知る限り)赤方偏移データベースに書き込むことができません。 –
私は何をヒントしようとしましたか?あなたは、いくつかのデータフレーム+それらの結合を必要とせずに、1つのスパークデータフレームを使用して解決できると思います。もう一度 - どのアルゴリズムを使用しようとしていますか?期待される成果は? – Yaron