Sparkストリームライブラリを使用してkafkaからJSON文字列を読み取ろうとしています。コードはkafkaブローカーに接続できますが、メッセージのデコード中に失敗します。 。; ILkafka /メッセージ/メッセージ; JLkafka /シリアライザ/デコーダ; Lkafka/kafka.message.MessageAndMetadata(Ljava /ラング/文字列:コードはKafkaストリームからJSON RDDを解析中のJavaの実行
val kStream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kParams, kTopic).map(_._2)
println("Starting to read from kafka topic:" + topicStr)
kStream.foreachRDD { rdd =>
if (rdd.toLocalIterator.nonEmpty) {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.read.json(rdd).registerTempTable("mytable")
if (firstTime) {
sqlContext.sql("SELECT * FROM mytable").printSchema()
}
val df = sqlContext.sql(selectStr)
df.collect.foreach(println)
df.rdd.saveAsTextFile(fileName)
mergeFiles(fileName, firstTime)
firstTime = false
println(rdd.name)
}
java.lang.NoSuchMethodErrorのからインスピレーションを得ていますシリアライザ/デコーダ;)V at org.apache.spark.streaming.kafka.KafkaRDD $ KafkaRDDIterator.getNext(KafkaRDD.scala:222) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) をscala.collection.Iterator($ Iterator.scala:727)に配置します。 (scalable.collection.mutable.ArrayBufferのscala.collection.AbstractIterator.foreach(Iterator.scala:1157) の3210をscala.collection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:48) に配置します。 $ plus $ plus $ eq(ArrayBuffer.scala:103) 、scala.collection.mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce $ class.to(TraversableOnce .scala:273) scala.collection.AbstractIterator.to(Iterator.scalaで:1157) scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala時:265)
あなたはどのようにジョブを実行しましたか?実行時にkafkaが利用できないようです。 –
Kafkaが利用可能で、接続しています。ランダムなカフカブローカーに変更して、これを否定的にテストしました。例外は行から来ていますif(rdd.toLocalIterator.nonEmpty){ –