2016-04-01 29 views
3

Sparkを使用してHDFSにファイルを書き込むとき、パーティショニングを使用しないときは非常に高速です。その代わりに、ファイルを書き込むためにパーティショニングを使用すると、書き込み遅延が約24倍に増加します。ファイル書き込みのためのスパークパーティショニングが非常に遅い

同じファイルの場合、パーティションなしでの書き込みには約600msかかります。 Idでパーティションを作成すると(ファイルに1.000のIDがあるため、正確に1.000のパーティションが生成されます)、約14秒かかります。

パーティション化されたファイルの作成に非常に時間がかかる人もいますか?これの根本的な原因は何か、おそらくSparkは各パーティションに1.000のフォルダとファイルを作成する必要がありますか? これがどのようにスピードアップできるか考えていますか?

val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) } 

val df = myRdd.toDF 

df.write.mode(SaveMode.Append) 
.partitionBy("id") 
.parquet(path) 
+0

使用するコードを含めることができますか? – zero323

答えて

0

スパークエグゼキュータは、HDFSと通信してデータを書き出します。パーティション化後にデータがクラスタ全体にどのように拡散されるかによって異なります。

データのサイズが小さい場合は、複数のエグゼキュータ・ノードからHDFSへの接続を確立して書き込みを行う時間が、ファイル全体を連続して書き込む場合と比較してより多くなります。これを回避する方法

:レンジパーティショナを指定してみてください(キーをハッシュと同じハッシュを持つキーが同じノードに行く)、デフォルトのスパークパーティションにより、ハッシュパーティショナを使用してデータを

を、サンプルスニペットを見つけてください以下のスニペットでは、Hashパーティショナー yourRdd.groupByKey()を使用しています。saveAsTextFile( "HDFS PATH");

次のスニペットでは、カスタムレンジパーティションを使用しています RangePartitioner(8, yourRdd)で説明したように8つのパーティションが作成され、8つの接続を介して書き込むと、1000接続を書き込む方が適しています。

val tunedPartitioner = new RangePartitioner(8, yourRdd) 
val partitioned = yourRdd.partitionBy(tunedPartitioner).saveAsTextFile("HDFS PATH"); 

再びこれは、書き込むデータの間のトレードオフとあなたが作成したパーティションの数です。

+1

アイデアは良いですが、データフレームでは機能しません。データを再分割して寄木細工に保存する例を表示できますか? – alexeipab

+0

@alexeipab現在、カスタムパーティションを使用してパーティションを作成することはできません。あなたが行うことができる唯一のことは、再パーティションを使用して列ごとに分割することです。代わりに、myDF.rdd.partitionBy()を使用して、データフレームの基礎となるRDDをパーティション分割することができます。 – Vektor88

関連する問題