2016-04-11 26 views
7

私はScalaを使用しており、独自のDataFrame関数を構築したいと考えています。たとえば、配列のような列を扱い、各要素を繰り返し処理して計算したいとします。Sparkビルドカスタム列関数、ユーザ定義関数

まず、私自身のgetMaxメソッドを実装しようとしています。だから、列xが値[3,8,2,5,9]を持っているだろう、と予想される方法の出力は、ここでは9

だろうこれは、Scalaの

def getMax(inputArray: Array[Int]): Int = { 
    var maxValue = inputArray(0) 
    for (i <- 1 until inputArray.length if inputArray(i) > maxValue) { 
    maxValue = inputArray(i) 
    } 
    maxValue 
} 

で次のようになります私がこれまで持っているものである、と

"value length is not a member of org.apache.spark.sql.column", 

をこのエラーを取得し、私は列を反復処理する方法を他に知りません。

def getMax(col: Column): Column = { 
var maxValue = col(0) 
for (i <- 1 until col.length if col(i) > maxValue){ 
    maxValue = col(i) 
} 
maxValue 

}

私は自分自身のメソッドを実装することができていたら、私は

val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”) 

列関数を作成します。そして私は、SQL文でこれを使用することができるように願って、例えば

val sample = sqlContext.sql("SELECT value_max(x) FROM table") 

とに期待される出力は、入力列[3,8,2,5,9]与えられ、9なり

別のスレッドSpark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frameからの回答に続いて、標準偏差のプライベートメソッドを作成します。 私が行う計算はこれよりも複雑になります(たとえば、列の各要素を比較しています)、正しい方向に進むのか、ユーザー定義関数を詳しく調べるべきですか?

+0

あなたの入力と出力/期待されるデータフレームを表示してください。 'show'を使います。 –

+0

こんにちは@JacekLaskowskiはコメントをありがとう、私はそれを明確に私が達成したいと思うように編集しました。 – other15

答えて

13

Spark DataFrameでは、列が反復可能なオブジェクトではないため、考えたアプローチを使用して列の要素を反復処理することはできません。

しかし、列の値を処理するために、あなたには、いくつかのオプションがあり、右のいずれかがあなたのタスクによって異なります。

1)

スパークSQLが既に持っている既存の組み込み関数を使用します集約関数や変換関数を含む列の処理に役立つ豊富な関数それらのほとんどはfunctionsパッケージ(documentation here)にあります。いくつかの他のもの(一般的なバイナリ関数)は、Columnオブジェクト(documentation here)で直接見つけることができます。だから、あなたがそれらを使うことができるなら、それは通常最も良い選択肢です。 注:Window Functionsを忘れないでください。

2)あなたは組み込み関数を使用してタスクを完了できない場合は、あなたが)UDF(ユーザー定義関数を定義することを考慮してUDF

を作成します。これらは、列の各項目を独立して処理でき、元の行と同じ行数(集約された列ではない)の新しい列を作成することが期待される場合に便利です。このアプローチは非常に簡単です。まず、単純な関数を定義し、それをUDFとして登録してから使用します。例:

def myFunc: (String => String) = { s => s.toLowerCase } 

import org.apache.spark.sql.functions.udf 
val myUDF = udf(myFun) 

val newDF = df.withColumn("newCol", myUDF(df("oldCol"))) 

詳細については、here'sをご覧ください。使用

3)UDAF

あなたのタスクは、集計データを作成することであるならば、あなたはUDAF(ユーザー定義集計関数を定義することができます)。私はこれで多くの経験を持っていないが、私は素敵なチュートリアルにあなたを指すことができます:

https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/

4)あなたが本当にできない場合RDD処理

にフォールバック上記のオプションを使用するか、処理タスクが異なる行に依存して処理する場合は集計ではないので、必要な列を選択して対応するRDDを使用して処理する必要があります。例:だから

val singleColumnDF = df("column") 

val myRDD = singleColumnDF.rdd 

// process myRDD 

、私は考えることができたオプションがありました。私はそれが助けて欲しい

+0

ダニエルありがとう、非常に有益。したがって、UDFとUDAFの主な違いは、UDAFが列の計算に基づいて1つの値を返すことです。私は組み込み関数が私がやりたいことに十分であることを望んでいますが、私自身の関数を実装する方法を知っておくと良いでしょう。 – other15

+0

@ other15 UDAFは通常 'groupBy'と一緒に適用されるので、' groupBy'に渡された列の別個の値ごとに集計された値を返すことができます(単純な 'df.groupBy(" key " avg( "value")) 'が動作します)。ただし、groupByを使用しない場合、UDAFは1つの値のみを返します。 –

0

簡単な例は、全セクションがUDFのに専用されているexcellent documentation、で与えられる:

import org.apache.spark.sql._ 

val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") 
val spark = df.sparkSession 
spark.udf.register("simpleUDF", (v: Int) => v * v) 
df.select($"id", callUDF("simpleUDF", $"value"))