2017-01-13 7 views
0

2つのRDDの2つのキーがそれぞれ異なるタイプの2つのキーと値のペアのRDDがあるとします。

RDD1(ベクトル[文字列]、文字列)のようになり:スパーク:異なるキータイプを持つRDDのキーの交差

(Vector("A", "B", "E"), "bla bla bla"), 
(Vector("W"), "bla bla bla bla"), 
(Vector("C", "M"), "bla bla bla bla bla"), 
(Vector("A", "V"), "bla bla bla") 
... 


RDD2 [(文字列、文字列)]のようになり:

("A", 12), 
("B", 434), 
("C", 8023), 
("D", 3454), 
... 
("N", 251) 

注:のキーそのRDD2はANを含む。

所望の出力は、これはRDDSでは不可能であればベクトルキー内のすべての文字列もRDD2

(Vector("A", "E", "B"), "bla bla bla"), 
(Vector("C", "M"), "bla bla bla bla bla") 


のキーのセット全体のサブセットであるように、第1 RDD1の対です私は、データフレームやデータセットのような他の抽象化は、この結果に

答えて

0
def myFilter(rdd1: RDD[(Vector[String],String)], rdd2: RDD[(String,String)]): RDD[(Vector[String],String)] = { 

    val keys = rdd2.map(_._1).collect() 

    val filtered = rdd1.filter{ entry => 
     entry._1.forall(str => keys.contains(str)) 
    } 
    filtered 
} 

を達成できるか。これは、それが行わ得るための最も効率的な方法ではありませんが、仕事を取得します知っているように思います。

+0

あなたの答えに感謝します。参考までに、val keys = rdd2.keys.collect()を使用すると読みやすくなりました。しかし、私はまだRDD transformatinosでこの結果を得る方法を探しています。他の抽象化を使用しています。これはデータフレームやデータセットのようなものに慣れていません。 – 7kemZmani

関連する問題