2016-11-09 7 views
2

抽象クラスAがあるとします。クラスAを継承したクラスBCもあります。これらのクラスを考えるとSpark Scala:親タイプを受け入れる関数にサブタイプを渡す

abstract class A { 
    def x: Int 
} 
case class B(i: Int) extends A { 
    override def x = -i 
} 
case class C(i: Int) extends A { 
    override def x = i 
} 

、私は以下のRDDを構築:

def f(data: RDD[(Long, Set[A])]) = { 
    data.flatMap({ 
    case (k, v) => v map { af => 
     (af, 1) 
    } 
    }).reduceByKey(_ + _) 
} 

val data = sc.parallelize(Seq(
     Set(B(1), B(2)), 
     Set(B(1), B(3)), 
     Set(B(1), B(5)) 
    )).cache 
     .zipWithIndex 
     .map {case(k, v) => (v, k)} 

私はまた、入力としてRDDを取得し、各要素の数を返し、次の機能を持っています

RDDは受理タイプAです。私はf(data: RDD[(Long, Set[B])])に関数のシグネチャを変更した場合

type mismatch; 
found : org.apache.spark.rdd.RDD[(Long, scala.collection.immutable.Set[B])] 
required: org.apache.spark.rdd.RDD[(Long, Set[A])] 
    val x = f(data) 

このエラーは消える;:BAのサブタイプであるが、私は以下のコンパイルエラーが出るようになりました、私は、予想通りval x = f(data)がカウントを返すことを期待しますしかし、私はRDDで他のサブクラス(例えばCのようなもの)を使用したいので、これを行うことはできません。私はまた、次のアプローチを試してみました

def f[T <: A](data: RDD[(Long, Set[T])]) = { 
    data.flatMap({ 
    case (k, v) => v map { af => 
     (af, 1) 
    } 
    }) reduceByKey(_ + _) 
} 

をしかし、これも私の次の実行時エラーを与える:

value reduceByKey is not a member of org.apache.spark.rdd.RDD[(T, Int)] 
possible cause: maybe a semicolon is missing before `value reduceByKey'? 
     }) reduceByKey(_ + _) 

私はこの上の任意の助けに感謝します。

+2

BがAのサブタイプであるという理由だけで、[A]セット[B]は設定のサブタイプであることを意味するものではありません。これは、 'Set'が不変であるためです。あなたのセットがセット[A] – puhlen

答えて

2

Set[T]は共変Collection[+T]が使用されている場合でも、のでBAサブタイプ与えられ、Set[A]はサブタイプもSet[B] RDD[T]のスーパータイプではないことを意味することは、さらにオプションを制限することもTに不変である、Tに不変である(たとえば、 a List[+T])、同じ状況が発生します。

代わりの方法の多態型に頼ることができます。 上記のバージョンでは、消去後にクラス情報を保存するためにSparkが必要とするのはClassTagです。

これは動作するはずです:

import scala.reflect.{ClassTag} 
def f[T:ClassTag](data: RDD[(Long, Set[T])]) = { 
    data.flatMap({ 
    case (k, v) => v map { af => 
     (af, 1) 
    } 
    }) reduceByKey(_ + _) 
} 

は見てみましょう:

val intRdd = sparkContext.parallelize(Seq((1l, Set(1,2,3)), (2L, Set(4,5,6)))) 
val res1= f(intRdd).collect 
// Array[(Int, Int)] = Array((4,1), (1,1), (5,1), (6,1), (2,1), (3,1)) 

val strRdd = sparkContext.parallelize(Seq((1l, Set("a","b","c")), (2L, Set("d","e","f")))) 
val res2 = f(strRdd).collect 
// Array[(String, Int)] = Array((d,1), (e,1), (a,1), (b,1), (f,1), (c,1)) 
+0

であることを確認する必要があります。これは、RDDが1つのオブジェクトのインスタンス(例ではintとstring)しか含んでいない場合に完全に機能します。しかし、次のようなRDDがあるとします。val mixRdd = sc.parallelize(Seq((1l、Set(1,2,3))、(2L、Set(4,5,6))、(3L、Set "a"、 "b")))) '。このシナリオでは、コードは失敗します。 – Ashkan

+0

これはb/cです。Scalaは、そのミックスの製品タイプを推測します。 'Set [_>:String with Int] 'とそのクラスタグは見つかりません。型を具体的な型にcoherceしたい場合、 'val mixRdd:RDD [(Long、Set [Any])] = sc.parallelize(Seq((1l、Set(1,2,3)) (2L、Set(4,5,6))、(3L、Set( "a"、 "b")))) – maasg

関連する問題