2016-08-16 2 views
1

私はApache Sparkには新しく、米国の州によってデータフレームを再分割しようとしています。私はそれ自身のRDDに各パーティションに分割し、特定の場所に保存する:Spark Dataframeの再パーティション化を保証する方法

schema = types.StructType([ 
    types.StructField("details", types.StructType([ 
     types.StructField("state", types.StringType(), True) 
    ]), True) 
]) 

raw_rdd = spark_context.parallelize([ 
    '{"details": {"state": "AL"}}', 
    '{"details": {"state": "AK"}}', 
    '{"details": {"state": "AZ"}}', 
    '{"details": {"state": "AR"}}', 
    '{"details": {"state": "CA"}}', 
    '{"details": {"state": "CO"}}', 
    '{"details": {"state": "CT"}}', 
    '{"details": {"state": "DE"}}', 
    '{"details": {"state": "FL"}}', 
    '{"details": {"state": "GA"}}' 
]).map(
    lambda row: json.loads(row) 
) 

rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd 

for index in range(0, rdd.getNumPartitions()): 
    partition = rdd.mapPartitionsWithIndex(
     lambda partition_index, partition: partition if partition_index == index else [] 
    ).coalesce(1) 

    if partition.count() > 0: 
     df = sql_context.createDataFrame(partition, schema=schema) 

     for event in df.collect(): 
      print "Partition {0}: {1}".format(index, str(event)) 
    else: 
     print "Partition {0}: No rows".format(index) 

テストするために、私は50行(この例では10)、異なる各持つS3からファイルをロードします状態をdetails.state列に入力します。動作を模倣するために、上記の例でデータを並列化しましたが、動作は同じです。私は尋ねた50のパーティションを取得しますが、いくつかは使用されておらず、複数のパーティションには複数の状態のエントリがあります。ここでは10のサンプルセットのための出力です:

Partition 0: Row(details=Row(state=u'AK')) 
Partition 1: Row(details=Row(state=u'AL')) 
Partition 1: Row(details=Row(state=u'CT')) 
Partition 2: Row(details=Row(state=u'CA')) 
Partition 3: No rows 
Partition 4: No rows 
Partition 5: Row(details=Row(state=u'AZ')) 
Partition 6: Row(details=Row(state=u'CO')) 
Partition 6: Row(details=Row(state=u'FL')) 
Partition 6: Row(details=Row(state=u'GA')) 
Partition 7: Row(details=Row(state=u'AR')) 
Partition 7: Row(details=Row(state=u'DE')) 
Partition 8: No rows 
Partition 9: No rows 

私の質問は:再パーティション戦略がスパークするだけの提案であるか、私のコードとは根本的に間違って何かがあるのでしょうか?

答えて

2

ここでは何も起こりません。あなたが書き込みのファイルにそれを分離したい場合

from pyspark.sql.functions import expr 

states = sc.parallelize([ 
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY" 
]) 

states_df = states.map(lambda x: (x,)).toDF(["state"]) 

states_df.select(expr("pmod(hash(state), 50)")).distinct().count() 
# 26 

:スパークは、パーティション間と50個のパーティションを持つ行を配布するために、パーティションの分割キー(正)モジュロ番号のハッシュを使用しているあなたは、重複のかなりの数を取得しますDataFrameWriterpartitionBy節を使用する方が良いです。レベルごとに別々の出力を作成し、シャッフルする必要はありません。

本当に完全再分割を行う場合は、カスタムパーティショナーを使用できるRDD APIを使用できます。

関連する問題