4

私はここでSpark SQLはフィルタリングされた等結合に対して述語プッシュダウンを実行しますか?

A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols]) 

形式のgroup_id「フィルタ、等は、参加する」を実行するためにスパークSQL(1.6)を使用してに興味が粗いです:group_idの単一の値は、たとえば、1万件のレコードを関連付けることができAとBの両方で使用することができます。

pair_filter_udfを除いて、等結合が単独で実行された場合、group_idの粗さは計算上の問題を引き起こします。たとえば、AとBの両方で10,000レコードのgroup_idの場合、結合には1億エントリがあります。このような大規模なグループが何千もある場合、膨大なテーブルが生成され、非常に簡単にメモリが不足する可能性があります。

したがって、pair_filter_udfを結合にプッシュし、すべてのペアが生成されるまで待つのではなく、生成されたペアをフィルタリングすることが不可欠です。私の質問は、Spark SQLがこれを行うかどうかです。

は、私は、そのクエリプランが何であったか、単純なフィルタ等結合と尋ねたスパークを設定します。

# run in PySpark Shell 
import pyspark.sql.functions as F 

sq = sqlContext 
n=100 
g=10 
a = sq.range(n) 
a = a.withColumn('grp',F.floor(a['id']/g)*g) 
a = a.withColumnRenamed('id','id_a') 

b = sq.range(n) 
b = b.withColumn('grp',F.floor(b['id']/g)*g) 
b = b.withColumnRenamed('id','id_b') 

c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp']) 
c = c.sort('id_a') 
c = c[['grp','id_a','id_b']] 
c.explain() 

結果:

== Physical Plan == 
Sort [id_a#21L ASC], true, 0 
+- ConvertToUnsafe 
    +- Exchange rangepartitioning(id_a#21L ASC,200), None 
     +- ConvertToSafe 
     +- Project [grp#20L,id_a#21L,id_b#24L] 
      +- Filter (abs((id_a#21L - id_b#24L)) < 2) 
       +- SortMergeJoin [grp#20L], [grp#23L] 
        :- Sort [grp#20L ASC], false, 0 
        : +- TungstenExchange hashpartitioning(grp#20L,200), None 
        :  +- Project [id#19L AS id_a#21L,(FLOOR((cast(id#19L as double)/10.0)) * 10) AS grp#20L] 
        :  +- Scan ExistingRDD[id#19L] 
        +- Sort [grp#23L ASC], false, 0 
        +- TungstenExchange hashpartitioning(grp#23L,200), None 
         +- Project [id#22L AS id_b#24L,(FLOOR((cast(id#22L as double)/10.0)) * 10) AS grp#23L] 
          +- Scan ExistingRDD[id#22L] 

これらは、計画からキー線です:

+- Filter (abs((id_a#21L - id_b#24L)) < 2) 
    +- SortMergeJoin [grp#20L], [grp#23L] 

これらの行は、フィルタが結合後の別の段階で実行されるという印象を与えます。所望の挙動。しかし、それは暗黙のうちに結合に押し込まれている可能性があり、クエリプランにはそのレベルの詳細が欠けているだけです。

この場合、スパークが何をしているのかはどうすればわかりますか?

更新:スパークは、プッシュダウンをやっていない場合は私のラップトップがクラッシュするのに十分でなければならないのn = 1E6とg = 1E5、と

私が実行している実験。それは衝突していないので、プッシュダウンをしていると思います。しかし、それがどのように機能し、Spark SQLソースのどの部分がこのすばらしい最適化を担当しているか知ることは面白いでしょう。

答えて

3

かなり多くは、あなたが意味するものによって異なります。プッシュダウン|a.id_a - b.id_b| < 2a.grp = b.grpの隣のjoinロジックの一部として実行されるかどうかを尋ねると、答えは否定的です。平等に基づいていない述語は、joinの条件に直接含まれません。

enter image description here

あなたがfilterを見ることができるようにSortMergeJoinとは別の変換として実行されます:あなたはそれを説明することができます

一つの方法は、それがこのような多かれ少なかれになります代わりに、実行計画のDAGを使用することです。別の方法は、a.grp = b.grpをドロップするときに実行計画を分析することです。あなたはそれがありません追加の最適化とfilter続い直積にjoinを展開していることがわかります:

d = a.join(b,(F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp']) 

## == Physical Plan == 
## Project [id_a#2L,grp#1L,id_b#5L] 
## +- Filter (abs((id_a#2L - id_b#5L)) < 2) 
## +- CartesianProduct 
##  :- ConvertToSafe 
##  : +- Project [id#0L AS id_a#2L,(FLOOR((cast(id#0L as double)/10.0)) * 10) AS grp#1L] 
##  :  +- Scan ExistingRDD[id#0L] 
##  +- ConvertToSafe 
##   +- Project [id#3L AS id_b#5L] 
##    +- Scan ExistingRDD[id#3L] 

それはあなたのコード意味しています(ないデカルトのものを - あなたが本当に実際には、これを避けたい)を生成します巨大な中間テーブル?

いいえ、そうではありません。 SortMergeJoinfilterは、1つのステージとして実行されます(DAGを参照)。 DataFrame操作のいくつかの詳細はわずかに低いレベルで適用できますが、基本的にScala Iteratorsas shown in a very illustrative way by Justin Pihonyの変換の連鎖にすぎません。Spark固有のロジックを追加することなく異なる操作を一緒に押しつぶすことができます。一方向または他の両方のフィルタが単一のタスクに適用されます。

+0

イテレータのスクラッチはかなり意味がありますが、それは私のために別の質問を提起します。数ヶ月前、Spark 1.2のRDDの[類似の実験](http://stackoverflow.com/questions/34092211/how-does-spark-execute-a-join-filter-is-it-scalable)を行いました。 1と反対の結果を得た。あなたの同じロジックがその例に当てはまるようです:join + filterは、Spark RDD joinがちょうどそのように動作しない限り、イテレータを一緒に押しつぶすべきです。 – Paul

+0

ああ、申し訳ありません:) 'python_join.dispatch'は実際にジェネレータ式を返します。 – zero323

+1

ちょうど明白であるために - [this](https://github.com/apache/spark/blob/master/python/pyspark/join.py#L46-L51)が困惑していますか?あなたがリンクした質問/回答は<= 1.2の行動を記述するためです。 – zero323

関連する問題