私はここでSpark SQLはフィルタリングされた等結合に対して述語プッシュダウンを実行しますか?
A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols])
形式のgroup_id
「フィルタ、等は、参加する」を実行するためにスパークSQL(1.6)を使用してに興味が粗いです:group_id
の単一の値は、たとえば、1万件のレコードを関連付けることができAとBの両方で使用することができます。
pair_filter_udf
を除いて、等結合が単独で実行された場合、group_id
の粗さは計算上の問題を引き起こします。たとえば、AとBの両方で10,000レコードのgroup_id
の場合、結合には1億エントリがあります。このような大規模なグループが何千もある場合、膨大なテーブルが生成され、非常に簡単にメモリが不足する可能性があります。
したがって、pair_filter_udf
を結合にプッシュし、すべてのペアが生成されるまで待つのではなく、生成されたペアをフィルタリングすることが不可欠です。私の質問は、Spark SQLがこれを行うかどうかです。
は、私は、そのクエリプランが何であったか、単純なフィルタ等結合と尋ねたスパークを設定します。
# run in PySpark Shell
import pyspark.sql.functions as F
sq = sqlContext
n=100
g=10
a = sq.range(n)
a = a.withColumn('grp',F.floor(a['id']/g)*g)
a = a.withColumnRenamed('id','id_a')
b = sq.range(n)
b = b.withColumn('grp',F.floor(b['id']/g)*g)
b = b.withColumnRenamed('id','id_b')
c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])
c = c.sort('id_a')
c = c[['grp','id_a','id_b']]
c.explain()
結果:
== Physical Plan ==
Sort [id_a#21L ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(id_a#21L ASC,200), None
+- ConvertToSafe
+- Project [grp#20L,id_a#21L,id_b#24L]
+- Filter (abs((id_a#21L - id_b#24L)) < 2)
+- SortMergeJoin [grp#20L], [grp#23L]
:- Sort [grp#20L ASC], false, 0
: +- TungstenExchange hashpartitioning(grp#20L,200), None
: +- Project [id#19L AS id_a#21L,(FLOOR((cast(id#19L as double)/10.0)) * 10) AS grp#20L]
: +- Scan ExistingRDD[id#19L]
+- Sort [grp#23L ASC], false, 0
+- TungstenExchange hashpartitioning(grp#23L,200), None
+- Project [id#22L AS id_b#24L,(FLOOR((cast(id#22L as double)/10.0)) * 10) AS grp#23L]
+- Scan ExistingRDD[id#22L]
これらは、計画からキー線です:
+- Filter (abs((id_a#21L - id_b#24L)) < 2)
+- SortMergeJoin [grp#20L], [grp#23L]
これらの行は、フィルタが結合後の別の段階で実行されるという印象を与えます。所望の挙動。しかし、それは暗黙のうちに結合に押し込まれている可能性があり、クエリプランにはそのレベルの詳細が欠けているだけです。
この場合、スパークが何をしているのかはどうすればわかりますか?
更新:スパークは、プッシュダウンをやっていない場合は私のラップトップがクラッシュするのに十分でなければならないのn = 1E6とg = 1E5、と
私が実行している実験。それは衝突していないので、プッシュダウンをしていると思います。しかし、それがどのように機能し、Spark SQLソースのどの部分がこのすばらしい最適化を担当しているか知ることは面白いでしょう。
イテレータのスクラッチはかなり意味がありますが、それは私のために別の質問を提起します。数ヶ月前、Spark 1.2のRDDの[類似の実験](http://stackoverflow.com/questions/34092211/how-does-spark-execute-a-join-filter-is-it-scalable)を行いました。 1と反対の結果を得た。あなたの同じロジックがその例に当てはまるようです:join + filterは、Spark RDD joinがちょうどそのように動作しない限り、イテレータを一緒に押しつぶすべきです。 – Paul
ああ、申し訳ありません:) 'python_join.dispatch'は実際にジェネレータ式を返します。 – zero323
ちょうど明白であるために - [this](https://github.com/apache/spark/blob/master/python/pyspark/join.py#L46-L51)が困惑していますか?あなたがリンクした質問/回答は<= 1.2の行動を記述するためです。 – zero323