2016-07-28 13 views
5

sparkのtemptablの時間フィールドから値を変換または抽出するための簡単なUDFを作成しました。関数を登録しますが、関数sqlを使って関数を呼び出すと、NullPointerExceptionがスローされます。以下は私の機能とそれを実行するプロセスです。私はツェッペリンを使用しています。奇妙なことに、これは昨日働いていましたが、今朝働いていませんでした。ScalaとSpark UDF関数

def convert(time:String) : String = { 
    val sdf = new java.text.SimpleDateFormat("HH:mm") 
    val time1 = sdf.parse(time) 
    return sdf.format(time1) 
} 

機能は、SQLずに機能

sqlContext.udf.register("convert",convert _) 

テストに関数を登録する - これが

convert(12:12:12) -> returns 12:12 

テストこれが失敗したツェッペリンでSQLの関数を動作します。 TEMPTABLE

root 
|-- date: string (nullable = true) 
|-- time: string (nullable = true) 
|-- serverip: string (nullable = true) 
|-- request: string (nullable = true) 
|-- resource: string (nullable = true) 
|-- protocol: integer (nullable = true) 
|-- sourceip: string (nullable = true) 

私は取得していますスタックトレースの一部の

%sql 
select convert(time) from temptable limit 10 

構造。代わりに、関数を定義するの

java.lang.NullPointerException 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643) 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652) 
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54) 
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44) 

答えて

7

使用UDF直接

import org.apache.spark.sql.functions._ 

val convert = udf[String, String](time => { 
     val sdf = new java.text.SimpleDateFormat("HH:mm") 
     val time1 = sdf.parse(time) 
     sdf.format(time1) 
    } 
) 

UDFの入力パラメータは、列(または列)です。戻り値の型はColumnです。

case class UserDefinedFunction protected[sql] (
    f: AnyRef, 
    dataType: DataType, 
    inputTypes: Option[Seq[DataType]]) { 

    def apply(exprs: Column*): Column = { 
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil))) 
    } 
}