2016-02-05 13 views
5

以下は実行しているサンプルコードです。このスパークジョブが実行されると、broadcastjoinではなくsortmergejoinを使用してDataframe結合が行われています。Spark 1.6でデータフレームを結合している間にブロードキャストが発生しない

def joinedDf (sqlContext: SQLContext, 
       txnTable: DataFrame, 
       countriesDfBroadcast: Broadcast[DataFrame]): 
       DataFrame = { 
        txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"), 
        $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
       } 
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp") 

joinステートメントでbroadcast()ヒントを指定しても、broadcastjoinは実行されません。

オプティマイザはデータフレームをハッシュパーティション化しており、データのスキューが発生しています。

誰でもこの動作を見ましたか?

私はこれをSpark 1.6とHiveContextをSQLContextとして使用しています。スパークジョブは200人のエグゼキュータで実行されます。 txnTableのデータサイズは240GB、countriesDfのデータサイズは5MBです。

答えて

7

両方の方法でブロードキャストDataFrameとそのアクセス方法が正しくありません。

  • 標準のブロードキャストは、分散データ構造の処理には使用できません。内部的に意志collecttmp内部およびその後の放送から変換せず

    import org.apache.spark.sql.functions.broadcast 
    
    val countriesDf: DataFrame = ??? 
    val tmp: DataFrame = broadcast(
        countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries") 
    ) 
    
    txnTable.as("df1").join(
        broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") 
    

    :あなたはDataFrameに参加ブロードキャストを実行したい場合は、マークが放送用DataFrameを与えbroadcast機能を使用する必要があります。

  • 参加引数は熱心に評価されます。 SparkContext.broadcastをブロードキャストデータ構造のブロードキャスト値と共に使用することも可能であったが、joinが呼び出される前にローカルで評価される。それはなぜあなたの機能が全く機能するのですが、ブロードキャストに参加しないのです。

+0

ここで、BroadcastHashJoinが1回実行され、SortMergeJoinが別の実行で表示されます。 (同じコード、異なるデータセット)。 –

+0

ブロードキャスト・ジョインのサイズのしきい値を超えていると思います。 – zero323

+0

私は非常に高いspark.sql.autoBroadcastJoinThresholdを持っています。約1GB。また、ブロードキャストされるファイルは約5 MBです。しかし、他の実行では、上記の推奨事項は素晴らしいです。 –

関連する問題