これは私の例です。sparkでパーティションを効率的に配布して使用するには?
val arr = Array((1,2), (1,3), (1,4), (2,3), (4,5))
val data = sc.parallelize(arr, 5)
data.glom.map(_length).collect
Array[Int] = Array(1, 1, 1, 1, 1)
val agg = data.reduceByKey(_+_)
agg.glom.map(_.length).collect
Array[Int] = Array(0, 1, 1, 0, 1)
val fil = agg.filter(_._2 < 4)
fil.glom.map(_.length).collect
Array[Int] = Array(0, 0, 1, 0, 0)
val sub = data.map{case(x,y) => (x, (x,y))}.subtractByKey(fil).map(_._2)
Array[(Int, Int)] = Array((1,4), (1,3), (1,2), (4,5))
sub.glom.map(_.length).collect
Array[Int] = Array(0, 3, 0, 0, 1)
私が疑問に思っているのは、パーティションを均等に分散させることです。
変数data
は、5つのパーティションで構成され、すべてのデータが均等にパーティション化されています。いくつかのtransformation operation
後
ex)par1: (1,2)
par2: (1,3)
par3: (1,4)
par4: (2,3)
par5: (4,5)
は、sub
変数に割り当てられた5つのパーティションの2つだけが使用されています。
変数sub
は5つのパーティションで構成されていますが、すべてのデータが均等にパーティション化されているわけではありません。私はsub
変数に別のtransformation operation
を追加した場合
ex)par1: empty
par2: (1,2),(1,3),(1,4)
par3: empty
par4: empty
par5: (4,5)
は、5つの利用可能なパーティションが存在しますが、唯一の2つのパーティションを操作するために使用されます。
ex)sub.map{case(x,y) => (x, x, (x,y))}
データを操作するときにすべての使用可能なパーティションを使用したいと考えています。
私はrepartition
メソッドを使用しましたが、それは安くはありません。
ex) sub.repartition(5).glom.map(_.length).collect
Array[Int] = Array(0, 1, 1, 2, 0)
私は可能な限り多くのパーティションを利用するための賢明な方法を探しています。
良い方法はありますか?ない5行 -
返信いただきありがとうございます。 –