0
私はSparkを初めて使用しています。 私はカフカからストリームを受け取り、RDDに変換される次のスクリプトを書いています。各スパークストリーミング反復データを1つのRDDに保存する方法は?
私の目標は、各ストリーム反復からのデータを1つのRDDにメモリに格納することです。各ループのリストに要素を追加する。
conf = SparkConf().setAppName("Application")
sc = SparkContext(conf=conf)
def joinRDDs(rdd):
elements = rdd.collect()
rdds = sc.parallelize(elements)
transformed = rdds.map(lambda x: ('key', {u'name': x[1]}))
if __name__ == '__main__':
ssc = StreamingContext(sc, 2)
stream = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": host})
stream.foreachRDD(joinRDDs)
どのようにすればいいですか?
は