2017-02-24 7 views
1

に含まれている場合、私は、2つのRDD(K、V)、スパークでは、2つのマップのネストを許可されていないことを持っている。RDDは(K、V)Vは、別のRDD(K、V)V

val x = sc.parallelize(List((1,"abc"),(2,"efg"))) 
val y = sc.parallelize(List((1,"ab"),(2,"ef"), (3,"tag")) 

RDDが大きいかどうかチェックしたいのですが、 "abc"に "ab"が入っていますか?

+0

あなたが探している出力で質問を更新できますか。 –

+0

ありがとう、私は知っている "abc"が含まれている "ab"は、(abc、efg)のように入れて – ozil

答えて

0

あなたはそれがサブだRDDのxから値を選択すると仮定すると、このコードは動作するはずですRDD yをに存在しています。

def main(args: Array[String]): Unit = { 
    val x = spark.sparkContext.parallelize(List((1, "abc"), (2, "efg"))) 
    val y = spark.sparkContext.parallelize(List((1, "ab"), (2, "ef"), (3, "tag"))) 

    // This RDD is filtered. That is we are selecting elements from x only if the substring of the value is present in 
    // the RDD y. 
    val filteredRDD = filterRDD(x, y) 
    // Now we map the filteredRDD to our result list 
    val resultArray = filteredRDD.map(x => x._2).collect() 
} 

def filterRDD(x: RDD[(Int, String)], y: RDD[(Int, String)]): RDD[(Int, String)] = { 
    // Broadcasting the y RDD to all spark nodes, since we are collecting this before hand. 
    // The reason we are collecting the y RDD is to avoid call collect in the filter logic 
    val y_bc = spark.sparkContext.broadcast(y.collect.toSet) 
    x.filter(m => { 
    y_bc.value.exists(n => m._2.contains(n._2)) 
    }) 
} 
+1

ありがとう、データが小さいときに放送を使用してください。 – ozil

関連する問題