2015-11-02 26 views
14

を空にRDDを初期化する今、私は私が実際RDD年代を得るとき、私はこのexistingRDDで労働組合を行うことができるように空にこのexistingRDDを初期化する必要があります。 nullに初期化する以外は、existingRDDを空のRDDに初期化するにはどうすればよいですか?私はまだあなたがやろうとしているかについてはよく分からないは、私はRDDが</p> <pre><code>JavaPairRDD<String, List<String>> existingRDD; </code></pre> <p>と呼ばれてい

JavaPairRDD<String, List<String>> existingRDD; 
if(ai.get()%10==0) 
{ 
    existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/", 
    NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten         
} 
else 
{ 
    existingRDD.union(rdd); 
} 
+0

私はあなたがしようとしていることをよく理解していません。空のRDD上に共用体を作成したいですか?何のために? – eliasah

+0

はい。私はeachRDDをループするとき私のコードでforeachRDDを意味する私は、このexistingRDDのrundの和集合を行う必要があるので、私はこのexistingRDDを私のs3に保存することができます –

+0

なぜあなたは単に 'existingRDD'を' rdd'それをs3に書き込んだ後の最初の反復? –

答えて

21

が、以下のように、あなたは空のRDDを作成することができます: はここに私のコードです

// Get an RDD that has no partitions or elements. 
JavaRDD<T> emptyRDD = sc.emptyRDD() 

は、私はあなたがどのように知っている信頼します

JavaRDD<Tuple2<String,List<String>>> emptyRDD = sc.emptyRDD(); 
JavaPairRDD<String,List<String>> emptyPairRDD = JavaPairRDD.fromJavaRDD(
    existingRDD 
); 

あなたはまた、あなたのJavaRDDがに変換しmapToPairメソッドを使用することができます。それ以外の場合は、ここにある、ジェネリックを使用します。

Scalaの溶液:Scalaで

scala> val emptyRDD = sc.emptyRDD 
// emptyRDD: org.apache.spark.rdd.EmptyRDD[Nothing] = EmptyRDD[1] at ... 
0

、私はコマンドを "並列化" を使用しました。

val emptyRDD = sc.parallelize(Seq("")) 
+1

私は、RDD [String]を1つのエントリ、つまり空の文字列で作成することは確かです。 –

0

@eliasah答えは非常に便利です。私は空のペアRDDを作成するコードを提供しています。空ペアRDD(キー、値)を作成する必要があるシナリオを考えてみましょう。以下のスカラーコードは、キーをString、値をIntとして空ペアRDDを作成する方法を示しています。

type pairRDD = (String,Int) 
var resultRDD = sparkContext.emptyRDD[pairRDD] 

次のようにRDDが作成されます:Javaでは

resultRDD: org.apache.spark.rdd.EmptyRDD[(String, Int)] = EmptyRDD[0] at emptyRDD at <console>:29 
0

、空のRDDを作成するには、少し複雑でした。私はscala.reflect.classTagを使ってみましたが、うまくいきませんでした。多くのテストの結果、動作したコードはさらに簡単になりました。

private JavaRDD<Foo> getEmptyJavaRdd() { 

/* this code does not compile because require <T> as parameter into emptyRDD */ 
//  JavaRDD<Foo> emptyRDD = sparkContext.emptyRDD(); 
//  return emptyRDD; 

/* this should be the solution that try to emulate the scala <T> */ 
/* but i could not make it work too */ 
//  ClassTag<Foo> tag = scala.reflect.ClassTag$.MODULE$.apply(Foo.class); 
//  return sparkContext.emptyRDD(tag); 

/* this alternative worked into java 8 */ 
    return SparkContext.parallelize(
      java.util.Arrays.asList() 
    ); 

} 
関連する問題