2016-05-18 2 views
2

私はSpark 1.5/1.6を使用しています。私はDataFrameでreduceByKey操作を行いたいので、dfをrddに変換したくありません。スパークデータフレームreduceByKey

各行は同じように見え、id1には複数の行があります。

id1, [ (id21, score21, time21) , ((id22, score22, time22)) , ((id23, score23, time23)) ] 

ので、それぞれ「ID1」のために、私はなぜリストところで

、理由のすべてのレコードを望んでいない:

id1, id2, score, time 

は、私のようなものを持ちたいですdfをrddに変換したいのは、この(縮小した)データフレームを別のデータフレームに結合する必要があるためです。結合キーを再分割しているので、速くなります。同じことをrddで行うことはできません。

助けていただければ幸いです。次のようにないにした場合

val rdd = df.toRdd 
val parentRdd = rdd.dependencies(0) // Assuming first parent has the 
            // desired partitioning: adjust as needed 
val parentPartitioner = parentRdd.partitioner 
val optimizedReducedRdd = rdd.reduceByKey(parentPartitioner, reduceFn) 

をパーティを指定:

答えて

3

は単にreduceByKey呼び出しで親RDDパーティショナを再利用すでに、その後達成パーティションを保持するには

df.toRdd.reduceByKey(reduceFn) // This is non-optimized: uses full shuffle 

あなたが書き留めた振る舞いが起こります。つまり、フルシャッフルが起こります。代わりにHashPartitionerが使用されるためです。

関連する問題