2017-03-27 1 views
0

これは私の例です。sparkでパーティションを効率的に配布して使用するには?

val arr = Array((1,2), (1,3), (1,4), (2,3), (4,5)) 
val data = sc.parallelize(arr, 5) 

data.glom.map(_length).collect 
Array[Int] = Array(1, 1, 1, 1, 1) 

val agg = data.reduceByKey(_+_) 
agg.glom.map(_.length).collect 
Array[Int] = Array(0, 1, 1, 0, 1) 

val fil = agg.filter(_._2 < 4) 
fil.glom.map(_.length).collect 
Array[Int] = Array(0, 0, 1, 0, 0) 

val sub = data.map{case(x,y) => (x, (x,y))}.subtractByKey(fil).map(_._2) 
Array[(Int, Int)] = Array((1,4), (1,3), (1,2), (4,5)) 

sub.glom.map(_.length).collect 
Array[Int] = Array(0, 3, 0, 0, 1) 

私が疑問に思っているのは、パーティションを均等に分散させることです。

変数dataは、5つのパーティションで構成され、すべてのデータが均等にパーティション化されています。いくつかのtransformation operation

ex)par1: (1,2) 
    par2: (1,3) 
    par3: (1,4) 
    par4: (2,3) 
    par5: (4,5) 

は、sub変数に割り当てられた5つのパーティションの2つだけが使用されています。

変数subは5つのパーティションで構成されていますが、すべてのデータが均等にパーティション化されているわけではありません。私はsub変数に別のtransformation operationを追加した場合

ex)par1: empty 
    par2: (1,2),(1,3),(1,4) 
    par3: empty 
    par4: empty 
    par5: (4,5) 

は、5つの利用可能なパーティションが存在しますが、唯一の2つのパーティションを操作するために使用されます。

ex)sub.map{case(x,y) => (x, x, (x,y))} 

データを操作するときにすべての使用可能なパーティションを使用したいと考えています。

私はrepartitionメソッドを使用しましたが、それは安くはありません。

ex) sub.repartition(5).glom.map(_.length).collect 
Array[Int] = Array(0, 1, 1, 2, 0) 

私は可能な限り多くのパーティションを利用するための賢明な方法を探しています。

良い方法はありますか?ない5行 -

答えて

1

だからrepartitionは間違いなく

あなたの例では、スパークは、数十億行を処理するために構築されて何かを証明するために少し単純すぎる:)移動するための方法です。 repartition正確には各パーティションに同じ数の行を入れませんが、データは均等に分配されます。代わりに1.000.000行を使用してサンプルをやり直すと、repartitionの後にデータが実際に均等に分散されていることがわかります。

大量のデータの変換を扱う場合、データのスキューは大きな問題になることが多く、データの再パーティション化には、データをシャッフルするために追加の時間がかかります。次の変換ステージをより速く実行できるので、ペナルティをとる価値があります。

+0

返信いただきありがとうございます。 –

関連する問題