2016-12-27 3 views
3

私は大きな2次元配列で読み込むためのpysparkスクリプトを書いていますので、最初にインデックス配列を生成し、対応する配列で読み取るreadメソッドでマップしようとしました。たとえば、10行の配列がある場合、各パーティションに2行があるため、これらの10行が均等に分割されます。最初のパーティションは3を持っているので、私は予想通りSORTBYを()を示しているなぜsortBy()はSparkでデータを均等にソートできないのですか?

[[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]] 

動作しませんでした。しかし、結果として示した

rdd = sc.range(0, 10, 1).sortBy(lambda x: x, numPartitions = 5) 
rdd.glom().collect() 

:私はSORTBY()でこの方法を試してみました最後のパーティションには1つの番号しかありません。私が別のreadメソッドで各パーティションをマップすると、パーティションのサイズが異なり、ときどきstragglerが発生します。

そして私はRDDS世代の別の方法を試みた:

rdd = sc.parallelize(range(0, 10, 1), 5) 
rdd.glom().collect() 

をそして、それは私が望む結果を返します。

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]] 

誰かが、sortBy()を使用する最初のメソッドが均等にソートされた結果を返すことができない理由を説明できますか?

答えて

8

これは設計されていないためです。一般的に、同じサイズのパーティションを実現するためには、データをパーティション化する(レンジ・パーティション化を含む)ことはできません。パーティショナーの契約によって、特定の値に対するすべてのレコードが単一のパーティションに存在しなければならないことに注意してください。均一な分布を達成することが可能な場合であっても、厳密な区画境界を決定することは非常に高価になる。

スパークサンプルデータの目的はの均一サイズの範囲を取得することで、この動作は一般的なSparkアプリケーションでは十分です。

SparkContext.parallelizeパーティショナーはまったく使用しません。代わりに、特定の入力のセマンティクスに基づいて分割を計算するため、同じサイズの分割を作成することができます。

データの配信に関する事前知識があれば、いつでも希望の出力になるカスタムパーティショニング機能を設計できます。例:あなたはハッシュ・パーティション化を使用することができCPythonの中の整数の

比較的短いため
import bisect 
from functools import partial 

partition_func = partial(bisect.bisect, [2, 4, 6, 8]) 

(sc.range(0, 10) 
    .map(lambda x: (x, None)) 
    .repartitionAndSortWithinPartitions(5, partition_func) 
    .keys()) 

(1 < < 60かそこらまで)シリーズ:

(sc.range(0, 10, 1) 
    .map(lambda x: (x, None)) 
    .partitionBy(10) 
    .keys() 
    .glom() 
    .collect()) 
[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]] 

ちょうど実装の詳細です(hash(x)ここで、isinstance(x, int)xに等しい)。

関連する問題