2016-03-09 6 views
6

カフカの実装を.9にアップグレードし、新しいコンシューマJava APIを使用してコンシューマを作成しています。私はコンシューマ向けに以下のコードを使用しています。 LINE Bは私たちが受け取ったメッセージを処理する私たちのサービスへの呼び出しです。問題は、メッセージ処理に30秒以上かかる場合、例外が発生することです。kafkaを新しいコンシューマapiで.9にアップグレード

Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "test-group"); 
      props.put("auto.offset.reset", "earliest"); 
      props.put("heartbeat.interval.ms", "1000"); 
      props.put("receive.buffer.bytes", 10485760); 
      props.put("fetch.message.max.bytes", 5242880); 
      props.put("enable.auto.commit", false); 
    //with partition assigned to consumer 


      KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); 
      // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); 
      //consumer.assign(Arrays.asList(partition0)); 
      //assign topic to consumer without partition 
//LINE A 
      consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); 
      List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
      while (true) { 
       try { 
        ConsumerRecords<Object, Object> records = consumer.poll(1000); 
        consumeFromQueue(records);//LINE B 
        consumer.commitSync(); 

       } catch (CommitFailedException e) { 
        e.printStackTrace(); 
        System.out.println("CommitFailedException"); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        System.out.println("Exception in while consuming messages"); 
       } 

例外は

2016年3月3日10:47:35.095 INFO 6448 --- [尋ねる-スケジューラ-3]をo.a.k.c.c.internals.AbstractCoordinator:死者コーディネーター2147483647マーキング。 2016-03-03 10:47:35.096 ERROR 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator:グループTEST-GROUPのオフセットをコミット中にエラーILLEGAL_GENERATIONが発生しました CommitFailedException org.apache.kafka.clients。 consumer.CommitFailedException:グループの再調整によりコミットが完了しない (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) at org.apache.kafka.clients.consumer) org.apache.kafka.clafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) (org.apache.kafka.clients) で、コンシューマー・コーディネーター・ハンドルを使用しています。 consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals。 RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $ RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient。 java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetwo rkClient.java:213)

上記の例外は、オフセットの実行中に発生します。 ありがとうございました

答えて

6

これは、新しいコンシューマがシングルスレッドであり、コンシューマグループでハートビートを維持できる唯一の方法は、30秒後にオフセットをポーリングまたはコミットすることですあなたの消費者を死んだとしてマークし、グループの再調整を呼びかけています。 この状況では、request.timeout.msを増やすか、消費と処理の作業を2つのスレッドに分割できます。

+0

感謝を設定することにより、クライアントに返される、私は70000に「request.timeout.ms」を追加しようとした、それは私に与えていますメッセージ処理に30000がかかっても同じ例外が発生します。 –

+0

なぜ効果がないのかよく分かりません。私はさらに "session.timeout.ms"を70000に設定しようとしましたが、 "org.apache.kafka.common.errors.ApiException:セッションのタイムアウトが許容範囲内にない"という例外が出されました。私はあなたの2番目の提案について考えていましたが、どうすればスレッドを処理するのに間違っているのでしょうか?新しいメッセージとしてトピックに再度追加し、例外の前にメッセージによって引き起こされた変更を元に戻しますか? –

+0

またgroup.max.session.timeout.msを増やしてください。 – Nautilus

0

あなたはポーリングあたり以下のメッセージを取得することを、あなたの最大のメッセージよりも大きく、しかし低いので、いくつかの適切なしきい値に

max.partition.fetch.bytes 

を設定することで、世論調査で返されたメッセージ()の数を制限することができます。

カフカの0.10.xは、明示的メッセージの数を制限するためのサポートが返信用

max.poll.records 
関連する問題