2016-04-12 14 views
13

、I列と「DF」「COL1、COL2」データフレームがあると私は、各列に関数を適用した後行方向最大値を計算する:PySpark行単位簡単な例として機能する組成物

def f(x): 
    return (x+1) 

max_udf=udf(lambda x,y: max(x,y), IntegerType()) 
f_udf=udf(f, IntegerType()) 

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2))) 

もしそうならDF:

col1 col2 
1  2 
3  0 

その後

DF2:

col1 col2 result 
1  2  3 
3  0  4 

上記働くように見えるし、生成しません:「f_udfは」私のテーブルの上だけで正常に動作し、主な問題はである私は絶対に肯定的だ「という表現を評価することはできません... PythonUDFの#Fを」

をmax_udf。

追加の列を作成せずに、または基本的なマップ/リダクションを使用しないで、データフレームとudfsを使って上記を行う方法はありますか? 「max_udf」はどのように変更すればよいですか?

私も試してみた:

max_udf=udf(max, IntegerType()) 

同じエラーが発生しています。

私はまた、次の作品いることが確認されました:

df2=(df.withColumn("temp1", f_udf(df.col1)) 
     .withColumn("temp2", f_udf(df.col2)) 

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2)) 

なぜそれは私が一度にこれらの操作を行うことができないということですか?

"f_udf"と "max_udf"の関数に一般化された答えを見たいと思います。

答えて

21

を、私は同様の問題を抱えていたし、複数の列または全体を渡すにはthis stackoverflow question

への回答で解決策を見つけました

from pyspark.sql.functions import udf, struct 
from pyspark.sql.types import IntegerType 

df = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b")) 

count_empty_columns = udf(lambda row: len([x for x in row if x == None]), IntegerType()) 

new_df = df.withColumn("null_count", count_empty_columns(struct([df[x] for x in df.columns]))) 

new_df.show() 

戻り:

UDFの行は structを使用します
+----+----+----------+ 
| a| b|null_count| 
+----+----+----------+ 
|null|null|   2| 
| 1|null|   1| 
|null| 2|   1| 
+----+----+----------+ 
+0

ありがとう、これはこの質問に対する最初の本当の答えです! –

+0

@AlexR。あなたがこの答えに満足しているなら、それを受け入れてください! – proinsias

7

UserDefinedFunctionは、引数としてUDFを受け入れる際にエラーをスローしています。

以下のようにmax_udfを修正して動作させることができます。

df = sc.parallelize([(1, 2), (3, 0)]).toDF(["col1", "col2"]) 

max_udf = udf(lambda x, y: max(x + 1, y + 1), IntegerType()) 

df2 = df.withColumn("result", max_udf(df.col1, df.col2)) 

それとも

def f_udf(x): 
    return (x + 1) 

max_udf = udf(lambda x, y: max(x, y), IntegerType()) 
## f_udf=udf(f, IntegerType()) 

df2 = df.withColumn("result", max_udf(f_udf(df.col1), f_udf(df.col2))) 

第二のアプローチが有効であれば、内部の機能(ここではf_udf)は、有効なSQL式を生成する場合にのみ。

f_udf(df.col1)f_udf(df.col2)max_udfに渡される前に、それぞれColumn<b'(col1 + 1)'>Column<b'(col2 + 1)'>として評価されているので、ここで動作します。任意の機能ではうまくいかないでしょう。

私たちは、たとえば、このような何かしようとした場合に機能しません。

from math import exp 

df.withColumn("result", max_udf(exp(df.col1), exp(df.col2))) 
+0

返信いただきありがとうございます!第2のアプローチを明確にすることはできますか?私はデータフレームの列に適用するためにfafudfをbonafide UDFにする必要はありません。 –

+0

また、2番目の答えは、データフレーム列が "+"操作に応答するという事実を利用しているようです。これを他の "f_udf"に一般化するものはありますか?一般的に、私はいくつかの異なる "f_udf"関数を持っていれば、それぞれのために別々のmax_udf関数のセットを書く必要がありますか? –

+0

私は申し訳ありませんが、私はまた、スパークするために新しいです。私は、UDFに変換することなく、通常の関数でカラムに対する操作を行うことができることに気付きました。あなたは別の質問としてそれを上げることができますか?私もansを知る必要があります – Mohan

関連する問題