2017-12-16 12 views
0

非常に大きなgzipファイルがあります。私はPySparkを使用して、後で使用するためにParquet形式のS3にあるすべてのファイルを再保存しようとしています。PySpark:パーティション分割せずに出力ファイルを別々のファイルに書き込む

単一ファイルの場合は(例では、2012年6月1日)は私が行います

dataframe = spark.read.csv('s3://mybucket/input/20120601.gz', schema=my_schema, header=True) 
dataframe.write.parquet('s3://mybucket/output/20120601') 

それは動作しますが、GZIPが分割されていないので、それは単一のホスト上で動作し、私は何の利益を得るんクラスタを使用する

私は(この例では、月に読んで)一度にファイルのチャンクで読み取り、このような日常のファイルに出力を書き込むためにpartitionByを使用してみました:

dataframe = spark.read.csv('s3://mybucket/input/201206*.gz', schema=my_schema, header=True) 
dataframe.write.partitionBy('dayColumn').parquet('s3://mybucket/output/') 

この時間は、個々のファイルがあります私が望むように異なるエグゼクターで読むことができますが、エグゼキュータは後で死んでプロセスは失敗します。私はファイルが非常に大きいので、パーティションビーは何とか不必要なリソース(シャッフル?)を使用しているので、タスクがクラッシュすると思います。

これは単なる1:1マッピングなので、データフレームの再パーティションは実際には必要ありません。とにかく、個々のタスクを明示的に名前を付けられた別個の寄木細工の出力ファイルに書き込むようにしていますか?

クラスタに火花セッションを放送することができないので、これは動作しませんを除いて、私は

def write_file(date): 
    # get input/output locations from date 
    dataframe = spark.read.csv(input_location, schema=my_schema, header=True) 
    dataframe.write.parquet(output_location) 
spark.sparkContext.parallelize(my_dates).for_each(write_file) 

のようなものを考えていました。助言がありますか?

答えて

1

TLを再分割することなく、出力ファイルを分離するために、入力ファイルを書き込む; DRこれはあなたのコードが既にやっていることです。

partitionByは不要シャッフル

全くシャッフルない号DataFrameWriter.partitionByを引き起こしています。

それは動作しますが、gzipではないので分割

することはでき:完全

  • ドロップ圧縮 - 寄木張り、内部圧縮を使用しています。
  • 分割可能な圧縮をbzip2のように使用します。
  • ジョブをサブミットする前に、ファイルを一時ストレージに解凍します。 DataFrame partitionBy to a single Parquet file (per partition) - あなたはpartitionByが使用するリソースについての懸念がある場合

(それは各エグゼキュータスレッドのファイルの大きな数を開くことがあります)あなたは、実際にパフォーマンスを向上させるためにシャッフルすることができます。単一ファイルはずっとに、おそらくですが、

someOtherColumnが合理的なカーディナリティを取得するように選択することができる
dataframe \ 
    .repartition(n, 'dayColumn', 'someOtherColumn') \ 
    .write.partitionBy('dayColumn') \ 
    .save(...) 

、物事を改善する必要があります。

+0

gzipが分割可能でないということは、タイプミスです。私はテストしましたが、問題は間違いなくパーティションです。私が同じコマンドを実行しても、出力ファイルが正しく指定されていないことを除いて、それは動作します。 partitionByを使用すると、エグゼキュータ・ノードは繰り返し繰り返します。私の前提は、これを引き起こすために起こっていなければならないシャッフルだったが、何かがpartitonByで不必要なリソースを使っていることは間違いない。 – DMulligan

関連する問題