15

私はpysparkを使用しています。大きなcsvファイルをspark-csvでデータフレームにロードしています。前処理ステップとして、さまざまな操作を適用する必要があります列の1つ(json文字列を含む)で使用可能なデータに変換します。これは、X値を返します。それぞれの値は、別々の列に格納する必要があります。Apache Spark - UDFの結果を複数のデータフレーム列に割り当てる

この機能はUDFで実装されます。しかし、私はそのUDFから値のリストを返す方法と、個々の列にこれらを与える方法がわかりません。

(...) 
from pyspark.sql.functions import udf 
def udf_test(n): 
    return [n/2, n%2] 

test_udf=udf(udf_test) 


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4) 

次を生成します:

+------+----------+--------------------+ 
|amount|trans_date|    test| 
+------+----------+--------------------+ 
| 28.0|2016-02-07|   [14.0, 0.0]| 
| 31.01|2016-02-07|[15.5050001144409...| 
| 13.41|2016-02-04|[6.70499992370605...| 
| 307.7|2015-02-17|[153.850006103515...| 
| 22.09|2016-02-05|[11.0450000762939...| 
+------+----------+--------------------+ 
only showing top 5 rows 

別々の列に、UDFによって返される2つ(この例では)値を格納するための最良の方法だろう何以下は簡単な例ですか?今、彼らは文字列として入力されています:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema() 

root 
|-- amount: float (nullable = true) 
|-- trans_date: string (nullable = true) 
|-- test: string (nullable = true) 

答えて

25

単一のUDFの呼び出しから複数のトップレベルの列を作成することはできませんが、あなたは新しいstructを作成することができます。これは、指定されたreturnTypeでUDFを必要とします。

from pyspark.sql.functions import udf 
from pyspark.sql.types import * 

schema = StructType([ 
    StructField("foo", FloatType(), False), 
    StructField("bar", FloatType(), False) 
]) 

def udf_test(n): 
    return (n/2, n % 2) if n and n != 0.0 else (float('nan'), float('nan')) 

test_udf = udf(udf_test, schema) 
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"]) 

foobars = df.select(test_udf("y").alias("foobar")) 
foobars.printSchema() 
## root 
## |-- foobar: struct (nullable = true) 
## | |-- foo: float (nullable = false) 
## | |-- bar: float (nullable = false) 

あなたは、さらに簡単なselectでスキーマをフラット化:

foobars.select("foobar.foo", "foobar.bar").show() 
## +---+---+ 
## |foo|bar| 
## +---+---+ 
## |1.0|0.0| 
## |1.5|1.0| 
## +---+---+ 

Derive multiple columns from a single column in a Spark DataFrame

+0

ファンタスティックも参照してください!これは私が必要としていたことに対して非常にうまく機能します私はそこにいる最中でしたが、udfにStructTypeスキーマを誤って渡していました。これは、新しい列をStringTypeとして代わりに使用していました。本当にありがとう! –

+0

ありがとうございます!これはまさに私が探していたものでした。 :) – dksahuji

関連する問題