2016-12-03 7 views
1

Sparkデータフレームで連鎖機能を実行する方法は?私のコードでは、最初に大文字で行い、ブール変換を行いたいと思っています。私のコードは機能しませんでした。ありがとうSparkのデータフレームでチェーン関数を実行する方法は?

import org.apache.spark.sql.functions.udf 
val trimStr: String => String = _.trim 
val trimUDF = udf(trimStr) 

import org.apache.spark.sql.functions.udf 
val upperCaseStr: String => String = _.toUpperCase 
val upperCaseUDF = udf(upperCaseStr) 

import org.apache.spark.sql.functions.udf 
    def booleanValueSubstitution = udf[String, String] { 
     case "" => "N" 
     case null => "N"   
     case "TRUE" => "Y"   
    } 
var df= df1.withColumn("xx", booleanValueSubstitution(upperCaseUDF(df1("yy")))) 

答えて

0

最初にすべてを再発明しないでください。多くの一般的なタスクのためには、組み込み関数を見つけることができます:あり、booleanValueSubstitutionを除いて、あなたが定義した

val df = Seq(None, Some(""), Some("true"), Some(" TRUE "), Some("foo")).toDF("x") 

df.select(upper(trim($"x"))) 
​​

機能:

import org.apache.spark.sql.functions.{trim, upper} 

、他の利点の中ではnull安全ですない。あなたのコードがNULLに遭遇するたびに、それはNPEのために失敗するでしょう。あなたがホイールを再発明することに決めたら、それを常にカバーするべきです。あなたはbooleanValueSubstitutionで行ったように

あなたはパターンマッチ、またはあなたが使用することができますすることができTry

import scala.util.Try 

val upperCaseUDF = udf((s: String) => Try(s.toUpperCase).toOption) 
val trimUDF = udf((s: String) => Try(s.trim).toOption) 

あなたは条件が網羅されていることを確認してパターンマッチングを使用する場合:

val booleanValueSubstitution = udf[String, String] { 
    case "" => "N" 
    case null => "N"   
    case "TRUE" => "Y" 
    case _ => "N" 
} 

か簡単:

val booleanValueSubstitution = udf[String, String] {  
    case "TRUE" => "Y" 
    case _ => "N" 
} 

そうでなければscala.MatchError

次に、あなたがudfとパターンマッチングの代わりに使用することができる慣用SQLソリューションが存在します。あなたは、例えばCASE WHENを使用することができます。

import org.apache.spark.sql.functions.{when, coalesce, lit} 

df.select(
    when($"x".isNull, "N").when($"x" === "", "N").when($"x" === "TRUE", "Y") 
) 

か:一般的に

when($"x".isNull, false).when($"x" === "", false).otherwise(true) 

    最後に
    df.select(coalesce(when($"x" === "TRUE", "Y").otherwise("N"), lit("N"))) 
    

    あなたはブール値を使用することをお勧めしブール値を考える場合

  • 組み込み関数は、典型的にはいくつかの理由でUDFよりも優先。チェーンが常に可能ではないので
  • UDFは、すべての位置では許可されません。

    df.withColumn("foo", someUDF("x")).withColumn("bar", someFunc("foo")) 
    
:あなたは別途 udf結果を追加しなければならないときの状況があります。
関連する問題