6

KafkaConsumer(> = 0.9)に関する私のニーズに対応するソリューションを実装しようとすると、深刻な問題に直面しています。Kafka Consumer - Pollの振る舞い

私は、カフカのトピックからnというメッセージを読み取るだけの機能があるとしましょう。

たとえば、getMsgs(5) - >は、トピックで次の5カフカメッセージを取得します。

だから、私はこのようになりますループがあります。

for (boolean exit= false;!exit;) 
{ 
    Records = consumer.poll(200); 
    for (Record r:records) { 
     processRecord(r); //do my things 
     numMss++; 
     if (numMss==maximum) //maximum=5 
      exit=true; 
    } 
} 

は、この点を考慮し、問題が世論調査()メソッドが5つの以上のメッセージを得ることができるということです。たとえば、10個のメッセージを取得した場合、私のコードは、他の5個のメッセージを永久に忘れるでしょう。なぜなら、カフカはすでに消費されていると思うからです。

私はオフセットが、動作していないようコミット試してみました:

consumer.commitSync(Collections.singletonMap(partition, 
    new OffsetAndMetadata(record.offset() + 1))); 

でも、私は再び消費者を起動するたびオフセット設定、で、それは6日メッセージから起動しません覚えて、私はちょうどの5つのメッセージを望んでいたが、の11thから(最初のポーリングは10のメッセージを消費していたので)

これには解決策がありますか、または多分(確かに)私は何かを逃していますか?

ありがとうございます!

答えて

3

は、あなたがにmax.poll.recordsを設定することができますどんな番号でも好きなように、たいていあなたは各投票で多くのレコードを取得します。

この問題で述べたユースケースでは、自分で明示的にオフセットをコミットする必要はありません。 enable.auto.committrueに設定し、コンシューマがない場合にgroup.id(つまり、初めてパーティションから読み込みを開始するとき)のようにauto.offset.resetearliestに設定することができます。一度あなたはグループを持っています。idといくつかのコンシューマオフセットがKafkaに格納されています。カフカのコンシューマプロセスが終了した場合、コンシューマプロセスが終了すると、それはデフォルトの動作であるため、最後にコミットされたオフセットから継続します。最後のコミットされたオフセットから続きます。auto.offset.resetが入りません

0

"latest"としてauto.offset.resetプロパティを設定します。消費してみると、コミットされたオフセットから消費されたレコードが取得されます。

または、pollの前にconsumer.seek(TopicPartition、offset)apiを使用します。

+0

auto.offset.resetは最も早く、消費者group.idがない場合にのみ起動します。グループidなしでは、オフセットを格納することはできません。既に消費者グループidが存在する場合、auto.offset.resetは何もしません。また、デフォルトでは、消費者は最後にコミットされたオフセットからピッキングします。 – user1870400

0

enable.auto.commitをfalseに設定して自動コミットを無効にする必要があります。手動でオフセットをコミットする場合は、無効にする必要があります。それがなければ、poll()への次の呼び出しは、以前のpoll()から受け取ったメッセージの最新のオフセットを自動的にコミットします。

0

カフカ0.9より、auto.offset.resetパラメータ名が変更されました。

オフセット電流は、サーバー上の任意のより多く存在していない場合(例えば、データが削除されているので)カフカやオフセットなし初期がないときの対処方法:

earliest: automatically reset the offset to the earliest offset 

latest: automatically reset the offset to the latest offset 

none: throw exception to the consumer if no previous offset is found for the consumer's group 

anything else: throw exception to the consumer.