2016-09-23 22 views
7

Enumを含むケースクラスを使用してSpark Datasetを作成しようとしましたが、できません。私はSparkバージョン1.6.0を使用しています。例外は、Enumでエンコーダが見つからないという不満です。これはSparkではデータにenumを持たせることはできませんか?Enumを含むケースクラスからSpark DatasetまたはDataframeを作成する方法

コード:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object MyEnum extends Enumeration { 
    type MyEnum = Value 
    val Hello, World = Value 
} 

case class MyData(field: String, other: MyEnum.Value) 

object EnumTest { 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 
    val sqlCtx = new SQLContext(sc) 

    import sqlCtx.implicits._ 

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS() 

    println(s"df: ${df.collect().mkString(",")}}") 
    } 

} 

エラー:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value 
- field (class: "scala.Enumeration.Value", name: "other") 
- root class: "com.company.MyData" 
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597) 
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509) 
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
at scala.collection.immutable.List.foreach(List.scala:318) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502) 
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394) 
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54) 
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41) 
at com.company.EnumTest$.main(EnumTest.scala:22) 
at com.company.EnumTest.main(EnumTest.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) 

答えて

4

独自のエンコーダを作成することができます。

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

object MyEnum extends Enumeration { 
    type MyEnum = Value 
    val Hello, World = Value 
} 

case class MyData(field: String, other: MyEnum.Value) 

object MyDataEncoders { 
    implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] = 
    org.apache.spark.sql.Encoders.kryo[MyData] 
} 

object EnumTest { 
    import MyDataEncoders._ 

    def main(args: Array[String]): Unit = { 
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]") 
    val sc = new SparkContext(sparkConf) 
    val sqlCtx = new SQLContext(sc) 

    import sqlCtx.implicits._ 

    val df = sc.parallelize(Array(MyData("hello", MyEnum.World))).toDS() 

    println(s"df: ${df.collect().mkString(",")}}") 
    } 
} 
+0

感謝を! toDS()の代わりにtoDF()をしたいのですが?次に、次のエラーが表示されます。スレッド "main"の例外java.lang.UnsupportedOperationException:タイプcom.nordea.gpdw.dq.MyEnum.Valueのスキーマがサポートされていません – ErikHabanero

+0

私の回答で使用したコードとまったく同じコードを使用していますか?私は 'toDS'を' toDF'に変更しようとしましたが、うまくいくようです。 –

+0

はい、私はdf:MyData(hello、World)}をstdoutに出力してもよろしいですか?多くのログ出力があるので。 – ErikHabanero

関連する問題