2016-04-26 1 views
0

現在、私はSparkとScalacheckで作業していますが、RDD [(A、Long)] をフィルタリングしようとしています(AはAvroファイルから読み込んだレジスタ、LongはzipWithUniqueId()関数から得られる)をバッファに格納された同じRDDからのサンプルから抽出します。フィルタタプルの別のリストからのタプルのRDD

私が意図しているのは、そのサンプルのいくつかのプロパティをテストすることです。失敗した場合は、以前にサンプリングされた値を含まないRDDのサンプルでそのプロパティを再度テストします。 私はvarにrddを格納していますので、フィルターをかけたらそれを再割り当てすることができます。 私のコードは次のようになります:

val samplingSeed = new Random(System.currentTimeMillis()).nextLong() 
val sampled = rdd.takeSample(withReplacement = false, bufferSize, samplingSeed) 
val buffer: JQueue[(A, Long)] = new JConcurrentLinkedQueue[(A, Long)] 

//Sampled as Array converts to queue 
for (i <- 0 to sampled.length - 1) 
buffer.add(sampled(i).asInstanceOf[(A, Long)]) 

//rdd is assigned to a var for persistence 
//filter here and leave out all the tuples in buffer based in the 
//Long value in each tuple 
rdd= rdd.filter{foo} 

私はこれをどのように実現するだろうか?

+1

あなたがサンプリングされたIDの設定ブロードキャストすることができ、およびidは、この設定されている場合、フィルタにチェック: 'valのIDS = sc.broadcast( buffer.toSet.map(_._ 2)); rdd.filter(v =>!ids.value.contains(v._2)) ' –

+0

チャームのように働いてくれてありがとう – mtelloz

+0

あなたは歓迎です、私は答えを作りました。それを受け入れてください –

答えて

0

一般的には、セットによるフィルタリングが放送変数を使用して行うことができます。

val rdd = sc.parallelize((1 to 10).toSeq) 
val ids = sc.broadcast(Set(1, 2, 3)) 
rdd.filter(v => !ids.value.contains(v)).collect() 
res1: Array[Int] = Array(4, 5, 6, 7, 8, 9, 10) 
関連する問題