2017-01-14 4 views
0

私はSparkを初めて使用しています。 私はカフカからストリームを受け取り、RDDに変換される次のスクリプトを書いています。各スパークストリーミング反復データを1つのRDDに保存する方法は?

私の目標は、各ストリーム反復からのデータを1つのRDDにメモリに格納することです。各ループのリストに要素を追加する。

conf = SparkConf().setAppName("Application") 
sc = SparkContext(conf=conf) 

def joinRDDs(rdd): 
    elements = rdd.collect() 
    rdds = sc.parallelize(elements) 
    transformed = rdds.map(lambda x: ('key', {u'name': x[1]})) 

if __name__ == '__main__': 
    ssc = StreamingContext(sc, 2) 
    stream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": host}) 
    stream.foreachRDD(joinRDDs) 

どのようにすればいいですか?

答えて

0

使用updateStatebyKey(ご清聴ありがとうございました)とneeded.Functionは、新しいデータのthatsは、すべてのバッチで来ても、あなたがメモリ内に保持されている歴史的データ2 arguementsを取るよう機能に渡します。

デフcountPurchasers(newValues、lastSum): lastSumている場合、なし: lastSum = 0 戻り和(newValues、lastSum)

updateStatebBykey(countPurchasers)

関連する問題