2015-11-20 8 views
6

特定のパーティションに従ってS3に書き込む必要があるDataFrameがあります。コードは次のようになります。partitionByを使用して生成された寄木細工ファイルの数を制御する方法

dataframe 
    .write 
    .mode(SaveMode.Append) 
    .partitionBy("year", "month", "date", "country", "predicate") 
    .parquet(outputPath) 

partitionByは、それぞれのデータのほんの少し(〜1ギガバイト)でフォルダのかなり大きな数(〜400)にデータを分割します。 spark.sql.shuffle.partitionsのデフォルト値が200であるため、各フォルダ内の1GBのデータは200個の小さな寄木張りファイルに分割され、合計で80000個の寄木張りファイルが作成されます。これはいくつかの理由で最適ではないので、私はこれを避けたいと思います。

もちろん、spark.sql.shuffle.partitionsのほうがはるかに小さい数字に設定することもできますが、この設定ではジョインと集約のシャッフル用のパーティション数も制御されるため、これを変更したくありません。

書き込まれるファイルの数を制御する別の方法があるかどうか知りませんか?

+1

'.write'の前にデータフレームを再分割しましたか?一見、 'spark.sql.shuffle.partitions'はシャッフルとジョインでのみ使用されているようですが、それ以外はどこにもありません。それ以外の場合は、partitionByの追加の 'numParameter'パラメータのチケットを開く必要があります。 –

+0

@MariusSoutier Hmmm ... 'repartition'を' write before'と呼ぶと元の 'dataframe'が' partitionBy'関数で再分割される前に再分割されると思います。オリジナルのデータフレームをわずか10個のパーティションに再分割すると、間違いなくOOM例外が発生します。しかし、私は今それをテストする仕事を始めました。私は更新が完了したらすぐに取り返します。 –

+0

@MariusSoutier作品です!素晴らしい。ありがとうございました!あなたは返信としてそれを投稿したいですか?それから私はそれを答えたようにマークします:-) –

答えて

6

正しく指定したとおり、spark.sql.shuffle.partitionsは、SharkとSparkSQLの結合にのみ適用されます。単にパーティションの前の数で動作DataFrameWriter

partitionByは(あなたがwriteを呼び出すと、あなたはすぐにDateFrameDateFrameWriterから を移動します)。 (ライターのパーティションは、書き出されるテーブル/パーケットファイルに列を割り当てるだけなので、パーティションの数とは関係ありません。少し混乱します。)これはちょっと混乱します。それを作家に変える前に。

関連する問題