2016-08-16 20 views
1

私自身のユーザー定義関数を使用してSparkデータフレームをフィルタリングする必要があります。私のデータフレームは、jdbc接続を使用してデータベースから読み込まれ、フィルタリングされる前にsparkで自己結合操作を行います。このエラーは、フィルタの後にデータフレームcollectを入力しようとしたときに発生します。自己結合後にUDFを使用するSpark 2.0フィルタ

私はスパーク1.6でこれをうまく使っています。しかし、昨日、2.0にアップグレードした後、それはエラーで失敗します。ここでは

py4j.protocol.Py4JJavaError: An error occurred while calling o400.collectToPython. 
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
<lambda>(input[0, string, true]) 

は(私の環境では)エラーが発生し、最小限の例です:

from pyspark.sql.functions import udf, col 
from pyspark.sql.types import BooleanType 

spark = SparkSession.builder.master('local').appName('test').getOrCreate() 

# this works successfully 
df = spark.createDataFrame([('Alice', 1), ('Bob', 2), ('Dan', None)], 
          ['name', 'age']) 
df.filter(udf(lambda x: 'i' in x, BooleanType())(df.name)).collect() 
>>> [Row(name=u'Alice', age=1)] 

# this produces the error 
df_emp = spark.createDataFrame([(1, 'Alice', None), (2, 'Bob', 1), 
           (3, 'Dan', 2), (4, 'Joe', 2)], 
           ['id', 'name', 'manager_id']) 
df1 = df_emp.alias('df1') 
df2 = df_emp.alias('df2') 
cols = df1.columns 
# the self-join 
result = df1.join(df2, col('df1.id') == col('df2.manager_id'), 'left_outer') 
result.collect() 
>>> [Row(id=1, name=u'Alice', manager_id=None), 
    Row(id=3, name=u'Dan', manager_id=2), Row(id=2, name=u'Bob', manager_id=1), 
    Row(id=2, name=u'Bob', manager_id=1), Row(id=4, name=u'Joe', manager_id=2)] 

# simple udf filter 
filtered = result.filter(udf(lambda x: 'i' in x, BooleanType())(result.name)) 
filtered.collect() 
# the above error is produced... 

私はこのケースでは何も悪いことをしています?これは2.0のバグですか?あるいは、2つのバージョン間の動作の変化を考慮する必要がありますか?

答えて

2

これはpysparkのバグです。

私はこの問題はleft_outer、right_outerで発生し、外部結合ではなく、内部結合のためにここにhttps://issues.apache.org/jira/browse/SPARK-17100

それのためにバグを提出しました。

回避策の1つは、フィルターの前に結合結果をキャッシュすることです。

例:

result = df1.join(df2, col('df1.id') == col('df2.manager_id'), 'left_outer').select(df2.name).cache()

+0

前のセッションで働いていたUDFは失敗していたので、私は壁に頭を叩きました。これは私を救った!ありがとうTim! –