2016-04-12 11 views
1

を印刷していない私は、この単純なカフカストリームカフカdirectstream DSTREAMマップは

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Each Kafka message is a flight 
val flights = messages.map(_._2) 

flights.foreachRDD(rdd => { 
    println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records"); 
    rdd.map { flight => {   
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
    } 
    }   
}) 

ssc.start() 
ssc.awaitTermination() 

カフカは、メッセージ、スパークRDDSとしてそれらを取得することができ、それをストリーミングを持っています。しかし、私のコードの2番目のprintlnは何も印刷しません。私はローカルの[2]モードで実行されたときにドライバのコンソールログを見て、糸クライアントモードで実行されたときに糸ログを確認しました。

私には何が欠けていますか?代わりにrdd.mapの

、うまくスパーク・ドライバ・コンソールに次のコードを印刷:

for(flight <- rdd.collect().toArray) { 
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
} 

しかし、私はこの飛行物体上の処理ではなく、エグゼキュータの、スパークドライバプロジェクトに起こるかもしれないことを恐れています。私が間違っている場合は私を修正してください。

おかげ

+1

*ワーカー*エグゼキュータログを確認しましたか?おそらく、あなたの 'FlightParser'クラスが見つからないのでしょうか? –

答えて

1

rdd.mapは怠惰な変換です。そのRDDでアクションが呼び出されない限り、マテリアライズされません。
この具体的なケースでは、RDDの最も一般的なアクションの1つであるrdd.foreachを使用して、RDDの各要素にアクセスできます。このRDDアクションが執行で実行されていることを、私たちはエグゼキュータのSTDOUTにprintlnの出力を見つける考える

flights.foreachRDD{ rdd => 
    rdd.foreach { flight =>   
     val flightRows = FlightParser.parse(flight) 
     println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently 
    } 
} 

ドライバでデータを印刷する場合は、DStream.foreachRDDクロージャ内のRDDのデータをcollectにすることができます。

flights.foreachRDD{ rdd => 
    val allFlights = rdd.collect() 
    println(allFlights.mkString("\n")) // prints to the stdout of the driver 
} 
+0

あなたの提案に感謝して@massg。 org.apache.spark.SparkException:java.io.NotSerializableException:によって発生するタスク直列化可能ではない org.apache.spark.streaming.StreamingContext 私はあなたの最初のアプローチをしようとすると 、私は次の例外を取得しています私はこれが起こっていると思います。なぜなら、飛行変数はスパーク・ドライバーでのみ利用可能であり、エグゼクターには利用できないからです。 私は何が欠けていますか? –

関連する問題