2017-02-07 13 views
1

私はこのカスタムScalaのオブジェクト(基本的にはJavaのPOJO)持っている:私のメインクラスではフィルタリングRDD、型の不一致

object CustomObject { 

    implicit object Mapper extends JavaBeanColumnMapper[CustomObject] 

} 


class CustomObject extends Serializable { 


    @BeanProperty 
    var amount: Option[java.lang.Double] = _ 

    ... 
} 

を、私はこれらのCustomObjectsが含まれているRDDをロードしました。予想::(CustomObject)= 私は量が> 5000

val customObjectRDD = sc.objectFile[CustomObject]("objectFiles") 
val filteredRdd = customObjectRDD.filter(x => x.amount > 5000) 
println(filteredRdd.count()) 

は、しかし、私のエディタは

型の不一致と言う持つオブジェクトのみを含む新しいRDDそれらをフィルタリングして作成しようとしています>ブール値、実際値: (カスタムオブジェクト)=>すべて

これを動作させるには何が必要ですか?

答えて

3

>オペレータがOption[Double]で定義されていない、あなたのフィルタ述語がOptionを処理する必要があります。

scala> case class A(amount: Option[Double]) 
defined class A 

scala> val myRDD = sc.parallelize(Seq(A(Some(10000d)), A(None), A(Some(5001d)), A(Some(5000d)))) 
myRDD: org.apache.spark.rdd.RDD[A] = ParallelCollectionRDD[12] at parallelize at <console>:29 

scala> myRDD.filter(_.amount.exists(_ > 5000)).foreach{println} 
A(Some(10000.0)) 
A(Some(5001.0)) 

これはamount = Noneを持つ任意のオブジェクトは、フィルタ述語を失敗することを想定しています。 Option.existsの定義については、the docsを参照してください。