KafkaConsumer(> = 0.9)に関する私のニーズに対応するソリューションを実装しようとすると、深刻な問題に直面しています。Kafka Consumer - Pollの振る舞い
私は、カフカのトピックからnというメッセージを読み取るだけの機能があるとしましょう。
たとえば、getMsgs(5)
- >は、トピックで次の5カフカメッセージを取得します。
だから、私はこのようになりますループがあります。
for (boolean exit= false;!exit;)
{
Records = consumer.poll(200);
for (Record r:records) {
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
exit=true;
}
}
は、この点を考慮し、問題が世論調査()メソッドが5つの以上のメッセージを得ることができるということです。たとえば、10個のメッセージを取得した場合、私のコードは、他の5個のメッセージを永久に忘れるでしょう。なぜなら、カフカはすでに消費されていると思うからです。
私はオフセットが、動作していないようコミット試してみました:(
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
でも、私は再び消費者を起動するたびオフセット設定、で、それは6日メッセージから起動しません覚えて、私はちょうどの5つのメッセージを望んでいたが、の11thから(最初のポーリングは10のメッセージを消費していたので)
これには解決策がありますか、または多分(確かに)私は何かを逃していますか?
ありがとうございます!
auto.offset.resetは最も早く、消費者group.idがない場合にのみ起動します。グループidなしでは、オフセットを格納することはできません。既に消費者グループidが存在する場合、auto.offset.resetは何もしません。また、デフォルトでは、消費者は最後にコミットされたオフセットからピッキングします。 – user1870400