2016-11-13 17 views
8
  • スパーク2.0.0
  • Apacheのカフカ0.10.1.0
  • スカラ座2.11.8

私はそれが次の例外で失敗し、次のScalaのコードでspark streaming and kafka integration with kafka broker version 0.10.1.0を使用します。Spark Streaming Kafka Consumerの "java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord"を修正するには?

16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 

なぜですか?それを修正するには?


コード:

import org.apache.kafka.clients.consumer.ConsumerRecord 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark._ 
import org.apache.commons.codec.StringDecoder 
import org.apache.spark.streaming._ 

object KafkaConsumer_spark_test { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("KafkaConsumer_spark_test").setMaster("local[4]") 
    val ssc = new StreamingContext(conf, Seconds(1)) 
    ssc.checkpoint("./checkpoint") 
    val kafkaParams =Map[String, Object](
     "bootstrap.servers" -> "localhost:9092", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "example", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 

    val topics = Array("local1") 
    val stream = KafkaUtils.createDirectStream[String, String](
     ssc, 
     PreferConsistent, 
     Subscribe[String, String](topics, kafkaParams) 
    ) 
    stream.map(record => (record.key, record.value)) 
    stream.print() 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

例外:

16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) 
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
16/11/13 12:55:20 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying 
16/11/13 12:55:20 ERROR JobScheduler: Error running job streaming job 1479012920000 ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a)) 
    - element of array (index: 0) 
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122) 
    at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) 
    at scala.util.Try$.apply(Try.scala:192) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

@Chenghao Lvは::、ところで、なぜあなたはこれをインポートしているConsumerRecord'インポート 'でタイプがある人はカフカConsumerRecordsは、そのあなたがカフカのトピックからのメッセージを読むために値()メソッドを使用して使用されますあなたはこれをどこで使っているのですか? – Shankar

+0

@Shankar 'ConsumerRecord'はこのアプリケーションでは使用されていません。私は[demo](http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html)からインポートをコピーします –

+0

@LostInOverflowはConsumerRecord(トピック= local1、パーティション= 0、オフセット= 10000、CreateTime = 1479012919187、チェックサム= 1713832959、シリアライズされたキーサイズ= -1、シリアライズされた値のサイズ= 1、キー=ヌル、値= a) 'を意味します。礼儀正しくない? –

答えて

8

コンシューマー・レコード・オブジェクトはDSTREAMから受信されます。印刷しようとすると、そのオブジェクトがserailizableではないため、エラーが発生します。代わりにConsumerRecordオブジェクトから値を取得し、それを出力する必要があります。代わりにstream.printの

()、実行します。

stream.map(record=>(record.value().toString)).print 

これはあなたの問題を解決する必要があります。

+0

**ありがとうございました**!また、私はここで間違いを見つけました。実際には、私がしたいのは 'stream.map(record =>(record.key、record.value))です。print' –

1

KafkaUtils.createDirectStreamは、org.apache.spark.streaming.dstream.DStreamとして作成します。 RDDではありません。 Spark Streamingは、実行中のRDDを一時的に作成します。 RDDを取得するにはstream.foreach()を使用してRDDを取得し、次にRDD.foreachを使用してRDD内の各オブジェクトを取得します。

stream.foreachRDD { rdd => 
    rdd.foreach { record => 
    val value = record.value() 
    println(map.get(value)) 
    } 
} 
+0

あなたはどういう意味ですか: stream.foreachRDD {rdd => rdd.foreach {record => val value = record.value val key = record.key() println(キー+ ":" +値) } } – HansHarhoff

関連する問題