2016-12-04 2 views
0

Sparkでは、カフカからのストリームを5秒のバッチタイムで作成します。その時間に多くのメッセージが入ってくる可能性があり、それぞれを個別に処理したいのですが、現在のロジックでは、各バッチの最初のメッセージだけが処理されているようです。カフカストリームの最初のメッセージのみが処理されます

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topics) 

val messages = stream.map((x$2) => x$2._2) 

messages.foreachRDD { rdd => 
    if(!rdd.isEmpty) { 
     val message = rdd.map(parse) 
     println(message.collect()) 
    } 
} 

parse関数は、Jsonメッセージから関連するフィールドを単純にタプルに抽出します。

私はパーティションにドリルダウンして、個別にそのように各メッセージを処理することができます。

messages.foreachRDD { rdd => 
    if(!rdd.isEmpty) { 
     rdd.foreachPartition { partition => 
      partition.foreach{msg => 
       val message = parse(msg) 
       println(message) 
      } 
     } 
    } 
} 

しかし、私はRDDレベルで滞在する方法がある確信しています。最初の例で何が間違っていますか?

私はspark 2.0.0、scala 2.11.8、spark streaming kafka 0.8を使用しています。

答えて

1

ここには、各ループのバッチ内の各メッセージを大文字の内部に変換して印刷するサンプルStreamingアプリケーションがあります。このサンプルアプリケーションを試してから、アプリケーションを再チェックしてください。お役に立てれば。

object SparkKafkaStreaming { 

def main(args: Array[String]) { 

//Broker and topic 
val brokers = "localhost:9092" 
val topic = "myTopic" 

//Create context with 5 second batch interval 
val sparkConf = new SparkConf().setAppName("SparkKafkaStreaming").setMaster("local[2]") 
val ssc = new StreamingContext(sparkConf, Seconds(5)) 

//Create direct kafka stream with brokers and topics 
val topicsSet = Set[String](topic) 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val msgStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

//Message 
val msg = msgStream.map(_._2)  
msg.print() 

//For each 
msg.foreachRDD { rdd => 
    if (!rdd.isEmpty) { 
    println("-----Convert Message to UpperCase-----") 
    //convert messages to upper case 
    rdd.map { x => x.toUpperCase() }.collect().foreach(println) 
    } else { 
    println("No Message Received") 
    } 
} 

//Start the computation 
ssc.start() 
ssc.awaitTermination() 
    } 
} 
+0

これは私の例で示したロジックと同じように見えますが、その周囲にはもっとコンテキストがあります。構造が問題ではない場合、このコードが各バッチの最初のメッセージだけを処理するケースが考えられますか? – p3zo

+0

collect()メソッドは配列を返します。 "println(message.collect())"の代わりに "message.collect()。foreach(println)"を試してみてください。それでもすべてのメッセージが表示されない場合は、「解析」メソッドを再確認してください。このサンプルプログラムを実行し、Kafkaコマンドラインプロデューサからメッセージを送信することで確認できます。 – abaghel

+0

私は今参照してください。それは欠けていた 'foreach'です - 私はそれを追加し、すべてのメッセージが処理されます。 – p3zo

関連する問題