2016-05-30 8 views
1

Sparkストリームライブラリを使用してkafkaからJSON文字列を読み取ろうとしています。コードはkafkaブローカーに接続できますが、メッセージのデコード中に失敗します。 。; ILkafka /メッセージ/メッセージ; JLkafka /シリアライザ/デコーダ; Lkafka/kafka.message.MessageAndMetadata(Ljava /ラング/文字列:コードはKafkaストリームからJSON RDDを解析中のJavaの実行

https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson.scala

val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
     StringDecoder](ssc, kParams, kTopic).map(_._2) 
    println("Starting to read from kafka topic:" + topicStr) 
kStream.foreachRDD { rdd => 

    if (rdd.toLocalIterator.nonEmpty) { 

      val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
      sqlContext.read.json(rdd).registerTempTable("mytable") 
      if (firstTime) { 
       sqlContext.sql("SELECT * FROM mytable").printSchema() 
      } 
      val df = sqlContext.sql(selectStr) 
      df.collect.foreach(println) 
      df.rdd.saveAsTextFile(fileName) 
      mergeFiles(fileName, firstTime) 
      firstTime = false 
      println(rdd.name) 
     } 

java.lang.NoSuchMethodErrorのからインスピレーションを得ていますシリアライザ/デコーダ;)V at org.apache.spark.streaming.kafka.KafkaRDD $ KafkaRDDIterator.getNext(KafkaRDD.scala:222) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) をscala.collection.Iterator($ Iterator.scala:727)に配置します。 (scalable.collection.mutable.ArrayBufferのscala.collection.AbstractIterator.foreach(Iterator.scala:1157) の3210をscala.collection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:48) に配置します。 $ plus $ plus $ eq(ArrayBuffer.scala:103) 、scala.collection.mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce $ class.to(TraversableOnce .scala:273) scala.collection.AbstractIterator.to(Iterator.scalaで:1157) scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala時:265)

+0

あなたはどのようにジョブを実行しましたか?実行時にkafkaが利用できないようです。 –

+0

Kafkaが利用可能で、接続しています。ランダムなカフカブローカーに変更して、これを否定的にテストしました。例外は行から来ていますif(rdd.toLocalIterator.nonEmpty){ –

答えて

0

問題はバージョンとありましたカフカのジャーを使用し、0.9.0.0を使用して問題を修正しました。クラスkafka.message.MessageAndMetadataは0.8.2.0で導入されました。

関連する問題