2016-12-09 12 views

答えて

0

私は、2つのストリーミングコンテキストオブジェクトを作成し、トピックを変更する必要があるときにストリームを正常に停止して開始できると信じています。テストのために

0

私はqueueStream

data = 'abcdefgh' 
rddQueue1 = map(lambda x: sc.parallelize(x), zip(*[iter(data)] * 2)) 
rddQueue2 = map(lambda x: sc.parallelize(x), zip(*[iter(data.upper())] * 2)) 

s1, s2 = ssc.queueStream(rddQueue1), ssc.queueStream(rddQueue2) 

s3 = s1.transformWith(lambda t, x, y: x if int(str(t)[-1]) % 2 else y, s2) 

使用し、KafkaUtilsも()

をDStream.transformWithをサポートしています
関連する問題