2017-02-10 8 views
0

用ませ現行の割り当ては、スパークカフカのストリームを作成するために、私のScalaのコードではありません:カフカスパークストリームが例外をスローします:パーティション以下

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "server110:2181,server110:9092", 
"zookeeper" -> "server110:2181", 
"key.deserializer" -> classOf[StringDeserializer], 
"value.deserializer" -> classOf[StringDeserializer], 
"group.id" -> "example", 
"auto.offset.reset" -> "latest", 
"enable.auto.commit" -> (false: java.lang.Boolean) 
) 
val topics = Array("ABTest") 
val stream = KafkaUtils.createDirectStream[String, String](
ssc, 
PreferConsistent, 
Subscribe[String, String](topics, kafkaParams) 
) 

しかし、10時間のために実行した後、それが例外をスローします:

2017-02-10 10:56:20,000 INFO [JobGenerator] internals.ConsumerCoordinator: **Revoking previously assigned partitions** [ABTest-0, ABTest-1] for group example 
2017-02-10 10:56:20,000 INFO [JobGenerator] internals.AbstractCoordinator: (Re-)joining group example 
2017-02-10 10:56:20,011 INFO [JobGenerator] internals.AbstractCoordinator: (Re-)joining group example 
2017-02-10 10:56:40,057 INFO [JobGenerator] internals.AbstractCoordinator: Successfully joined group example with generation 5 
2017-02-10 10:56:40,058 INFO [JobGenerator] internals.ConsumerCoordinator: **Setting newly assigned partitions** [ABTest-1] for group example 
2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error generating jobs for time 1486695380000 ms 
java.lang.IllegalStateException: No current assignment for partition ABTest-0 
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) 
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295) 
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169) 
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179) 
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) 
at scala.Option.orElse(Option.scala:289) 
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) 
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) 
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) 
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) 
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) 
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) 
at scala.util.Try$.apply(Try.scala:192) 
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) 
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) 
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) 
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

明らかに、パーティションABTestMsg-0はすでにこのコンシューマに対して取り消されていますが、スパークストリーミングのコンシューマはそれを認識せず、取り消されたこのトピックパーティションのデータを消費し続けるため、例外が発生し、スパークジョブ全体が中止されます。 kafka rebalanceイベントは非常に正常だと思いますが、Sparkストリーミングのパーティションリボークイベントを正しく処理するためにコードを変更するにはどうすればよいですか?

+0

私の場合は、ZKClient org.I0Itec.zkclient.ZkClientクライアント=新しいZkClient( "サーバー名:2181")を使用して、トピックとそのパーティション取得しています 一覧 topicsList = JavaConversions.asJavaList(ZkUtils.getAllTopics(クライアントを)); ありがとう –

答えて

0

これを理解するにはしばらく時間がかかりました。それはバランスをとっているために起こります。購読ConsumerStrategyの代わりにAssignを使用します。

関連する問題