7

Spark DataFrameには、配列[Double]の列が含まれています。私はmap()関数でそれを取得しようとすると、ClassCastException例外がスローされます。以下のスカラコードは例外を生成します。Sparkのアクセスアレイの列

case class Dummy(x:Array[Double]) 
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3)))) 
val s = df.map(r => { 
    val arr:Array[Double] = r.getAs[Array[Double]]("x") 
    arr.sum 
}) 
s.foreach(println) 

例外それが動作しない理由を

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

カム誰かが私に説明していますか?代わりに私は何をすべきですか? 私は、Spark 1.5.1とScalaの2.10.6

おかげ

答えて

19

ArrayTypescala.collection.mutable.WrappedArrayとしてで表現されたを使用しています。

import scala.collection.mutable.WrappedArray 

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x") 

DataFrameが比較的薄い場合、パターンマッチングをより良いアプローチすることができます:あなたは、例えば

val arr: Seq[Double] = r.getAs[Seq[Double]]("x") 

または

val i: Int = ??? 
val arr = r.getSeq[Double](i) 

あるいはを使用して、それを抽出することができます

import org.apache.spark.sql.Row 

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)} 

シーケンスのタイプはチェックされていないことに注意してください。次のようにスパークで

> = 1.6あなたもDatasetを使用することができます。

df.select("x").as[Seq[Double]].rdd 
関連する問題