2016-03-21 24 views
1

私はRDDを持っており、それにRDDを追加したいと思っています。スパークでどうすればいいですか? 私は以下のようなコードを持っています。私が持っているdStreamからRDDを返したい。Sparkで既存のRDDにRDDを追加するには?

JavaDStream<Object> newDStream = dStream.map(this); 
JavaRDD<Object> rdd = context.sparkContext().emptyRDD(); 
return newDStream.wrapRDD(context.sparkContext().emptyRDD()); 

私は、Apacheのスパークによって提供さJavaDStreamクラスのwrapRDD方法について多くのドキュメントを見つけることができません。

答えて

1

あなたはJavaStreamingContext.queueStreamを使用してQueue<RDD<YourType>>でそれを埋めることができます。

public JavaInputDStream<Object> FillDStream() { 
    LinkedList<RDD<Object>> rdds = new LinkedList<RDD<Object>>(); 
    rdds.add(context.sparkContext.emptyRDD()); 
    rdds.add(context.sparkContext.emptyRDD()); 

    JavaInputDStream<Object> filledDStream = context.queueStream(rdds); 
    return filledStream; 
} 
+0

JavaRDDのリストを1つのJavaRDDに変換できますか? –

+0

はい。 'JavaRDD.union'を使うことができます。 –

+0

ユニオンは私にDstreamを与えますが、私はJavaRDDを私のメソッドの戻り値の型として使いたいと思います。 –

1

RDDが不変あるので、何を行うことができ、新しいRDDを作成するためにsparkContext.parallizeを使用して、新しいものを返すです。

List<Object> objectList = new ArrayList<Object>; 
objectList.add("your content"); 

JavaRDD<Object> objectRDD = sparkContext.parallize(objectList); 
JavaRDD<Object> newRDD = oldRDD.union(objectRDD); 
関連する問題