2016-12-19 11 views
1

SparkのUDFを使用して、タイムスタンプ、整数、別のデータフレームを取り込み、3つの値のタプルを返す必要があります。Spark ScalaでUDFを定義する

私はエラーの後でエラーを打ち続けると、もう私はそれをもう一度修正しようとしているのか分かりません。ここで

は関数である。

def determine_price (view_date: org.apache.spark.sql.types.TimestampType , product_id: Int, price_df: org.apache.spark.sql.DataFrame) : (Double, java.sql.Timestamp, Double) = { 
    var price_df_filtered = price_df.filter($"mkt_product_id" === product_id && $"created"<= view_date) 
    var price_df_joined = price_df_filtered.groupBy("mkt_product_id").agg("view_price" -> "min", "created" -> "max").withColumn("last_view_price_change", lit(1)) 
    var price_df_final = price_df_joined.join(price_df_filtered, price_df_joined("max(created)") === price_df_filtered("created")).filter($"last_view_price_change" === 1) 
    var result = (price_df_final.select("view_price").head().getDouble(0), price_df_final.select("created").head().getTimestamp(0), price_df_final.select("min(view_price)").head().getDouble(0)) 
    return result 
} 
val det_price_udf = udf(determine_price) 

それは私を与えるエラーは次のとおりです。

error: missing argument list for method determine_price 
Unapplied methods are only converted to functions when a function type is expected. 
You can make this conversion explicit by writing `determine_price _` or `determine_price(_,_,_)` instead of `determine_price`. 

私はINTがInt.typeが見つかっ予想通り、このような他のエラーで実行し続けるの引数の追加を開始した場合またはオブジェクトDataFrameはorg.apache.spark.sqlパッケージのメンバーではありません

コンテキストを提供する:

アイデアは、私は価格のデータフレーム、製品ID、作成日、製品IDとビュー日付を含む別のデータフレームを持っているということです。

ビューの日付より古いものが最後に作成された価格エントリに基づいて価格を決定する必要があります。

各製品IDには、2番目のデータフレームに複数のビュー日付があるためです。私は、UDFがクロス結合よりも速いと考えました。誰かが違う考えを持っているなら、私は感謝しています。

答えて

0

データフレームをUDFに渡すことはできません。特定のパーティションでは、UDFがワーカー上で実行されるためです。また、RDDをワーカーで使用することはできません(Is it possible to create nested RDDs in Apache Spark?)ので、同様にWorkerでDataFrameを使用することはできません。

あなたはこれを回避する必要があります。

+0

私はUDF引数からデータフレームを削除しました。データフレームはキャッシュされ、ブロードキャストされ、関数内からアクセス可能でなければなりません。私はまだエラーを受け取ります: 'error:type mismatch; が見つかりました:Int_type 必須:Int val det_price_udf = udf(determine_price(org.apache.spark.sql.types.TimestampType、Int)) ' – UrVal

+0

データフレームがUDFにない場合は、中古。 Pythonで慣れていたような "グローバル変数"ではありません。これを回避する方法がわかりません。 – UrVal

+0

あなたのユースケースは何ですか? –