非常に大きなgzipファイルがあります。私はPySparkを使用して、後で使用するためにParquet形式のS3にあるすべてのファイルを再保存しようとしています。PySpark:パーティション分割せずに出力ファイルを別々のファイルに書き込む
単一ファイルの場合は(例では、2012年6月1日)は私が行います
dataframe = spark.read.csv('s3://mybucket/input/20120601.gz', schema=my_schema, header=True)
dataframe.write.parquet('s3://mybucket/output/20120601')
それは動作しますが、GZIPが分割されていないので、それは単一のホスト上で動作し、私は何の利益を得るんクラスタを使用する
私は(この例では、月に読んで)一度にファイルのチャンクで読み取り、このような日常のファイルに出力を書き込むためにpartitionByを使用してみました:
dataframe = spark.read.csv('s3://mybucket/input/201206*.gz', schema=my_schema, header=True)
dataframe.write.partitionBy('dayColumn').parquet('s3://mybucket/output/')
この時間は、個々のファイルがあります私が望むように異なるエグゼクターで読むことができますが、エグゼキュータは後で死んでプロセスは失敗します。私はファイルが非常に大きいので、パーティションビーは何とか不必要なリソース(シャッフル?)を使用しているので、タスクがクラッシュすると思います。
これは単なる1:1マッピングなので、データフレームの再パーティションは実際には必要ありません。とにかく、個々のタスクを明示的に名前を付けられた別個の寄木細工の出力ファイルに書き込むようにしていますか?
クラスタに火花セッションを放送することができないので、これは動作しませんを除いて、私は
def write_file(date):
# get input/output locations from date
dataframe = spark.read.csv(input_location, schema=my_schema, header=True)
dataframe.write.parquet(output_location)
spark.sparkContext.parallelize(my_dates).for_each(write_file)
のようなものを考えていました。助言がありますか?
gzipが分割可能でないということは、タイプミスです。私はテストしましたが、問題は間違いなくパーティションです。私が同じコマンドを実行しても、出力ファイルが正しく指定されていないことを除いて、それは動作します。 partitionByを使用すると、エグゼキュータ・ノードは繰り返し繰り返します。私の前提は、これを引き起こすために起こっていなければならないシャッフルだったが、何かがpartitonByで不必要なリソースを使っていることは間違いない。 – DMulligan