2016-11-14 7 views
0

私はApache Zeppelinノートブックを使用しています。ですから、スパークは基本的にインタラクティブモードで動作しています。私はzeppelinがをスローするので、ここではクロージャー変数を使用できません。これは段落全体を直列化しようとしているからです(より大きなクロージャー)。スパークUDFマップを変換する方法

私はマップをUDFに渡すことしかできません。

私はparied RDDから収集し、次のマップを持っている:

ここスパーク変換の1で使用されている
final val idxMap = idxMapRdd.collectAsMap 

def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = { 

    predictions.array.map(idxMap.getOrElse(_, "Other")) 
} 
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)} 

val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap))) 

しかし、私はエラーを次しまっlit(idxMap)のステートメントで:

java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap 

私は以下を使用して列を作成しようとしました:

val colmap = map(idxMapArr.map(lit _): _*)

しかし、次のエラー取得:(完全性について)

<console>:139: error: type mismatch; 
found : Iterable[org.apache.spark.sql.Column] 
required: Seq[org.apache.spark.sql.Column] 
     val colmap = map(idxMapArr.map(lit _): _*) 

閉鎖アプローチ:

def predictionStrUDF2(idxMapArr: scala.collection.Map[Double,String]) = { 
    udf((predictions: WrappedArray[Double]) => labelStr(predictions, idxMapArr)) 
} 
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions"))) 

コンパイルが、私はcvmlPredictionsStr.showを行うときに、私は、次の取得。私は、これはツェッペリン

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2037) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:1924) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2139) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:526) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:486) 
    at org.apache.spark.sql.Dataset.show(Dataset.scala:495) 
    ... 62 elided 
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS 
Serialization stack: 
    - object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: [email protected]) 
    - field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS) 
    - object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f) 
    - field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator) 
    - object (class $iw, [email protected]) 
    - field (class: $iw, name: $iw, type: class $iw) 

答えて

1

質問のタイトルは、スパークのUDFについてですが、インタラクティブな性質によるものだと思うが、それはここで本当の問題だと思わ閉鎖シリアル化の問題がいくつかのインタラクティブな環境での展示を回避する方法です。問題のあなたの説明から、

は、それはあなたのノートブックのセルの一つで直接実行された場合、次は動作しないように聞こえる:Xは、クラスのメンバーであるため

val x = 5 
sc.parallelize(1 to 10).filter(_ > x).collect() 

これがそうですセルオブジェクト。ラムダがxをキャプチャすると、セルオブジェクト全体をシリアル化しようとします。セルオブジェクトは直列化可能ではないため、結果が乱雑な例外になります。この問題は、ラッパーオブジェクトで回避できます。このラッパーを宣言するためには、より巧妙な方法である可能性が高いことに注意してください(おそらく、中括弧で囲まれていれば十分です)。

object Wrapper { 
    def f() { 
     val x = 5 
     sc.parallelize(1 to 10).filter(_ > x).collect() 
    } 
} 
Wrapper.f() 

この問題を解決した後でもまだ質問が残っていることがありますが、現在は多すぎるサブトピックに触れています。閉包シリアル化問題の別の説明はhereです。

+0

あなたの最初の例は、私のゼップリンノートブックで問題なく動作しています。そこにWrapperを使う必要はありませんでした。 – nir

関連する問題