2016-11-21 9 views
6

私はthis tutorialと他の同様のチュートリアルをタスクのシリアライズに従いますが、コードはTask serializationエラーで失敗します。私はなぜそれが起こるのか分からない。私は変数topicOutputMessagesforeachRDDの外に設定しています。それから私はforeachPartitionの中でそれを読んでいます。また、私はKafkaProducer内部foreachPartitionを作成します。では、ここで何が問題なのですか?本当にポイントを得ることはできません。シリアライズ可能オブジェクトの使用:原因:java.io.NotSerializableException

al topicsSet = topicInputMessages.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> metadataBrokerList_InputQueue) 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) 


messages.foreachRDD(rdd => { 
    rdd.foreachPartition{iter => 
     UtilsDM.setMetadataBrokerList(metadataBrokerList) 
     UtilsDM.setOutputTopic(topicOutputMessages) 
     val producer = UtilsDM.createProducer 
     iter.foreach { msg => 
       val record = new ProducerRecord[String, String](UtilsDM.getOutputTopic(), msg) 
       producer.send(record) 
     } 
     producer.close() 
    } 
}) 

EDIT:

object UtilsDM extends Serializable { 

    var topicOutputMessages: String = "" 
    var metadataBrokerList: String = "" 
    var producer: KafkaProducer[String, String] = null 

    def setOutputTopic(t: String): Unit = { 
    topicOutputMessages = t 
    } 

    def setMetadataBrokerList(m: String): Unit = { 
    metadataBrokerList = m 
    } 

def createProducer: KafkaProducer[String, String] = { 

    val kafkaProps = new Properties() 

    kafkaProps.put("bootstrap.servers", metadataBrokerList) 

    // This is mandatory, even though we don't send key 
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    kafkaProps.put("acks", "1") 

    // how many times to retry when produce request fails? 
    kafkaProps.put("retries", "3") 
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes) 
    kafkaProps.put("batch.size", "5") 
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch 
    kafkaProps.put("linger.ms", "5") 

    new KafkaProducer[String, String](kafkaProps) 
    } 

} 

のフルスタックトレース:

16/11/21 13:47:30 ERROR JobScheduler: Error running job streaming job 1479732450000 ms.0 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) 
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer 
Serialization stack: 
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: [email protected]) 
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer) 
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>) 
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) 
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
    ... 30 more 
16/11/21 13:47:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:103) 
    at org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1.apply(KafkaDecisionsConsumer.scala:93) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.test.consumer.kafka.KafkaDecisionsConsumer 
Serialization stack: 
    - object not serializable (class: org.test.consumer.kafka.KafkaDecisionsConsumer, value: [email protected]) 
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer) 
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1, <function1>) 
    - field (class: org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, name: $outer, type: class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1) 
    - object (class org.test.consumer.kafka.KafkaDecisionsConsumer$$anonfun$run$1$$anonfun$apply$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
    ... 30 more 
+1

上記のコードでは、 'KafkaDecisionsConsumer'は何:私はこの回答で説明するように、例えば工場のアプローチを使用することを好む(非直列化可能な接続、あるいはローカル・キャッシュのような)エグゼキュータ・バインドされたオブジェクトに対処するために

?私は原因がコードサンプルで提供されていないと思う。この問題の原因となっているいくつかの関数 'run'があるはずです。 – maasg

+0

@maasg:私の答えを見てください。関数 'run'があります(' KafkaDecisionsConsumer'は 'TestRunner'に改名されています)。基本的に 'run'はコード全体が置かれる場所です。しかし、私は実際に 'broadcast'変数と' foreachPartition'を使ってこの問題を解決することができました。私はすぐに私の答えを更新します。 – duckertito

答えて

1

私は問題があなたのUtilsDMクラスにあると思います。それはクロージャーによって捕らえられ、Sparkはコードをシリアライズしてエグゼクターに発送しようとします。

utilsDMをシリアル化するか、foreachRDD関数内で作成するようにしてください。

+0

ご意見ありがとうございます。私はちょうど 'UtilsDM'のコードを投稿しました。実際にはそれは 'Serializable'を拡張していますので、シリアル化できると期待しています... – duckertito

+0

' foreachRDD'で 'UtilsDM'を作成するにはどうしたらいいですか?私はそれを試すことができるように、与えてください。どうもありがとう。 – duckertito

+0

完全なスタックトレースも提供できますか?どのバージョンのSparkを使用していますか? – Niemand

0

これは私の質問に対する答えではありませんが、それはうまくいく選択肢です。誰かが最終的な回答でそれを詳しく説明するかもしれませんか?この解決策の欠点は、metadataBrokerListtopicOutputMessages@transient lazy val topicOutputMessages@transient lazy val metadataBrokerListを使用してUtilsTestのコードから固定しなければならないということですが、理想的に私は、入力パラメータとして、これらのパラメータを渡すことができるようにしたいと思います:

object TestRunner { 

    var zkQuorum: String = "" 
    var metadataBrokerList: String = "" 
    var group: String = "" 
    val topicInputMessages: String = "" 

    def main(args: Array[String]) { 

    if (args.length < 14) { 
     System.err.println("Usage: TestRunner <zkQuorum> <metadataBrokerList> " + 
          "<group> <topicInputMessages>") 
     System.exit(1) 
    } 

    val Array(zkQuorum,metadataBrokerList,group,topicInputMessages) = args 

    setParameters(zkQuorum,metadataBrokerList,group,topicInputMessages) 

    run(kafka_num_threads.toInt) 

    } 


    def setParameters(mi: String, 
        mo: String, 
        g: String,t: String) { 
    zkQuorum = mi 
    metadataBrokerList = mo 
    group = g 
    topicInputMessages = t 
    } 


def run(kafkaNumThreads: Int) = { 
    val conf = new SparkConf() 
     .setAppName("TEST") 
    val sc = new SparkContext(conf) 
    sc.setCheckpointDir("~/checkpointDir") 

val ssc = new StreamingContext(sc, Seconds(5)) 

val topicMessagesMap = topicInputMessages.split(",").map((_, 1)).toMap 
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMessagesMap).map(_._2) 

    messages.foreachRDD(rdd => { 
     rdd.foreachPartition{iter => 
     val producer = UtilsTest.createProducer 
     iter.foreach { msg => 
      val record = new ProducerRecord[String, String](UtilsTest.getOutputTopic(), msg) 
      producer.send(record) 
     } 
     producer.close() 
     } 

    }) 

    ssc.start() 
    ssc.awaitTermination() 

    } 

} 



object UtilsDM extends Serializable { 

    @transient lazy val topicOutputMessages: String = "myTestTopic" 
    @transient lazy val metadataBrokerList: String = "172.12.34.233:9092" 

    var producer: KafkaProducer[String, String] = null 

def createProducer: KafkaProducer[String, String] = { 

    val kafkaProps = new Properties() 

    kafkaProps.put("bootstrap.servers", metadataBrokerList) 

    // This is mandatory, even though we don't send key 
    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    kafkaProps.put("acks", "1") 

    // how many times to retry when produce request fails? 
    kafkaProps.put("retries", "3") 
    // This is an upper limit of how many messages Kafka Producer will attempt to batch before sending (bytes) 
    kafkaProps.put("batch.size", "5") 
    // How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch 
    kafkaProps.put("linger.ms", "5") 

    new KafkaProducer[String, String](kafkaProps) 
    } 

} 
2

シリアライズ発行嘘ここmetadataBrokerListtopicOutputMessagesを参照して、失敗のコードでは

:( How spark handles objectあなたがこの回答に詳細に読むことができる)閉鎖シリアライズとの取引をスパークする方法で
rdd.foreachPartition{iter => 
    UtilsDM.setMetadataBrokerList(metadataBrokerList) 
    UtilsDM.setOutputTopic(topicOutputMessages) 

これらの変数が作成された外部オブジェクトへの参照を作成し、Sparkのクロージャークリーナーを強制的に「クリーン」クロージャーに含めます。 outerは、クロージャ内にsparkContextstreamingContextを含みます。これらは直列化できないため、直列化の例外です。

2回目の試み(回答として回避策として掲載)では、変数がヘルプオブジェクトに含まれているため、これらのリンクが壊れており、outerコンテキストからクロージャを「きれいにする」ことができます。

これらの変数に@transientを追加することは、値がシリアル化可能であると仮定すると、UtilsDMオブジェクト内では必要ないと思います。シングルトンオブジェクトは各エグゼキュータで再作成されることに注意してください。したがって、ドライバで変更された可変変数の値はエグゼキュータでは使用できません。正しく処理されないと、NullPointerExceptionが発生することがよくあります。

元のシナリオに役立つだろうシリアライゼーション・トリックがあります:

コピーはクロージャ内の変数を参照しました。例えば

rdd.foreachPartition{iter => 
    val innerMDBL = metadataBrokerList 
    val innerTOM = topicOutputMessages 
    UtilsDM.setMetadataBrokerList(innerMDBL) 
    UtilsDM.setOutputTopic(innerTOM) 

こうして、値はコンパイル時にコピーされ、外側にもリンクがありません。Redis on Spark:Task not serializable

+0

詳しい解答! – eliasah

関連する問題