0
Sparkでは、カフカからのストリームを5秒のバッチタイムで作成します。その時間に多くのメッセージが入ってくる可能性があり、それぞれを個別に処理したいのですが、現在のロジックでは、各バッチの最初のメッセージだけが処理されているようです。カフカストリームの最初のメッセージのみが処理されます
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topics)
val messages = stream.map((x$2) => x$2._2)
messages.foreachRDD { rdd =>
if(!rdd.isEmpty) {
val message = rdd.map(parse)
println(message.collect())
}
}
parse
関数は、Jsonメッセージから関連するフィールドを単純にタプルに抽出します。
私はパーティションにドリルダウンして、個別にそのように各メッセージを処理することができます。
messages.foreachRDD { rdd =>
if(!rdd.isEmpty) {
rdd.foreachPartition { partition =>
partition.foreach{msg =>
val message = parse(msg)
println(message)
}
}
}
}
しかし、私はRDDレベルで滞在する方法がある確信しています。最初の例で何が間違っていますか?
私はspark 2.0.0、scala 2.11.8、spark streaming kafka 0.8を使用しています。
これは私の例で示したロジックと同じように見えますが、その周囲にはもっとコンテキストがあります。構造が問題ではない場合、このコードが各バッチの最初のメッセージだけを処理するケースが考えられますか? – p3zo
collect()メソッドは配列を返します。 "println(message.collect())"の代わりに "message.collect()。foreach(println)"を試してみてください。それでもすべてのメッセージが表示されない場合は、「解析」メソッドを再確認してください。このサンプルプログラムを実行し、Kafkaコマンドラインプロデューサからメッセージを送信することで確認できます。 – abaghel
私は今参照してください。それは欠けていた 'foreach'です - 私はそれを追加し、すべてのメッセージが処理されます。 – p3zo