私はpysparkを使用しています。大きなcsvファイルをspark-csvでデータフレームにロードしています。前処理ステップとして、さまざまな操作を適用する必要があります列の1つ(json文字列を含む)で使用可能なデータに変換します。これは、X値を返します。それぞれの値は、別々の列に格納する必要があります。Apache Spark - UDFの結果を複数のデータフレーム列に割り当てる
この機能はUDFで実装されます。しかし、私はそのUDFから値のリストを返す方法と、個々の列にこれらを与える方法がわかりません。
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
次を生成します:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
別々の列に、UDFによって返される2つ(この例では)値を格納するための最良の方法だろう何以下は簡単な例ですか?今、彼らは文字列として入力されています:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
ファンタスティックも参照してください!これは私が必要としていたことに対して非常にうまく機能します私はそこにいる最中でしたが、udfにStructTypeスキーマを誤って渡していました。これは、新しい列をStringTypeとして代わりに使用していました。本当にありがとう! –
ありがとうございます!これはまさに私が探していたものでした。 :) – dksahuji