2016-10-14 8 views
1

KafkaUtilsを使用してKafkaからデータを受信するスパークストリーミングアプリケーションを作成します。私がKafkaから受け取ったデータを出力することです。ここに私のコードです(私が使用して私のスパークストリーミングジョブを実行するために火花を提出):私はこれを実行すると、それはかなりうまく動作しますスパークストリーミングでコンソールにRDD出力

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
messages.print() 

。入力はカフカのプロデューサーで、A、B、Cであれば、私は以下のようにスパークストリーミングから結果を得ることができます。

Time: 1476481700000 ms 

------------------------------------------- 
(null,a) 
(null,b) 
(null,c) 

をしかし、私は行数をカウントする1行を追加した場合、messages.print()が動作することはできません。コードは以下の通りである:私は以下の結果を得ています

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
messages.print() 
messages.count().print() 

------------------------------------------- 
Time: 1476481800000 ms 
------------------------------------------- 
4 

数のみがプリントアウト取得し、データをプリントアウトすることができません数えます。 messages.print()は私がmessages.count.print()を追加した後に実行されない理由です。
もう1つの疑問は、nullがタプル(null, a)(null, b)(null, c)の中にあることです。

答えて

0

print()に問題はなく、両方のメッセージが表示され、以下のようにカウントされます。スクロールしてログを確認します。

------------------------------------------- 
Time: 1476481700000 ms 
------------------------------------------- 
(null,a) 
(null,b) 
(null,c) 

------------------------------------------- 
Time: 1476481800000 ms 
------------------------------------------- 
4 

KafkaUtils.createDirectStream方法<Kafka topic, Kafka message>のDSTREAMを返します。トピックに関連するthisthisの投稿がnullであることを確認してください。

+0

と最終目標を達成することができます。彼らはプリントアウトされていますが、私は前にそれらを見ませんでした。どうも – Frankie

0

あなたのコードは動作するはずですが、あなたに代替手段を与えるべきです。しかし、このアプローチはテストや学習のためのものです。代わりに2 actionsを実行するには、はい、あなたは正しいですちょうど単一action

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
    //Cache your RDD before you perform any heavyweight operations. 
    messages.cache() 
    val result = messages.collect(); 
    println(result.size + " size") 
    result.foreach { input => println(input) } 
関連する問題