私はApache Zeppelinノートブックを使用しています。ですから、スパークは基本的にインタラクティブモードで動作しています。私はzeppelinがをスローするので、ここではクロージャー変数を使用できません。これは段落全体を直列化しようとしているからです(より大きなクロージャー)。スパークUDFマップを変換する方法
私はマップをUDFに渡すことしかできません。
私はparied RDDから収集し、次のマップを持っている:
ここスパーク変換の1で使用されているfinal val idxMap = idxMapRdd.collectAsMap
:
def labelStr(predictions: WrappedArray[Double], idxMap: Map[Double, String]): Array[String] = {
predictions.array.map(idxMap.getOrElse(_, "Other"))
}
@transient val predictionStrUDF = udf { (predictions: WrappedArray[Double], idxMap: Map[Double, String]) => labelStr(predictions)}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF(col("predictions"), lit(idxMap)))
しかし、私はエラーを次しまっlit(idxMap)
のステートメントで:
java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.HashMap$HashTrieMap
私は以下を使用して列を作成しようとしました:
val colmap = map(idxMapArr.map(lit _): _*)
しかし、次のエラー取得:(完全性について)
<console>:139: error: type mismatch;
found : Iterable[org.apache.spark.sql.Column]
required: Seq[org.apache.spark.sql.Column]
val colmap = map(idxMapArr.map(lit _): _*)
閉鎖アプローチ:
def predictionStrUDF2(idxMapArr: scala.collection.Map[Double,String]) = {
udf((predictions: WrappedArray[Double]) => labelStr(predictions, idxMapArr))
}
val cvmlPredictionsStr = cvmlPrediction.withColumn("predictionsStr", predictionStrUDF2(idxMapArr)(col("predictions")))
コンパイルが、私はcvmlPredictionsStr.show
を行うときに、私は、次の取得。私は、これはツェッペリン
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
... 62 elided
Caused by: java.io.NotSerializableException: com.github.fommil.netlib.F2jBLAS
Serialization stack:
- object not serializable (class: com.github.fommil.netlib.F2jBLAS, value: [email protected])
- field (class: org.apache.spark.ml.tuning.CrossValidator, name: f2jBLAS, type: class com.github.fommil.netlib.F2jBLAS)
- object (class org.apache.spark.ml.tuning.CrossValidator, cv_891fd6b7d95f)
- field (class: $iw, name: crossValidator, type: class org.apache.spark.ml.tuning.CrossValidator)
- object (class $iw, [email protected])
- field (class: $iw, name: $iw, type: class $iw)
あなたの最初の例は、私のゼップリンノートブックで問題なく動作しています。そこにWrapperを使う必要はありませんでした。 – nir