0

カフカを設定して、最古のメッセージや最新のメッセージを読むことができます。 以前のオフセットから読み取る必要がある場合に備えて、追加オプションを含めるにはどうすればよいですか? これを行う必要があるのは、先に処理ロジックが間違っているため、読み取られた以前のメッセージを再度処理する必要があるからです。kafkaを設定して、最も早い、最新の、そして任意のオフセットから読み取るオプションがあるようにするには?

答えて

0

java kafkaクライアントでは、次の消費位置を指定するために使用できるカフカ消費者に関するいくつかの方法があります。

ます。public void 消費者は、次のポーリング(タイムアウト)に使用するオフセットをフェッチ(TopicPartitionパーティション、 が長いオフセット)

オーバーライドをを求めています。このAPIを同じパーティションに対して複数回呼び出すと、次のpoll()で最新のオフセットが使用されます。このAPIが消費の途中で任意に使用され、フェッチオフセットをリセットする場合、データが失われる可能性があることに注意してください。

これで十分です。また、seekToBeginningとseekToEndもあります。

+0

パーティションが3つあり、最新のオフセットが12,13,15の場合、特定のタイムスタンプからすべてのメッセージを読みたい場合は、どのようにしてそれを覚えますか? –

+0

タイムスタンプからメッセージを読み取ることができません。ちょうどオフセットがあります。すべてのメッセージを読み取ってから、メッセージにタイムスタンプ値が含まれている場合に処理します。 – GuangshengZuo

+0

あなたは、各メッセージを読んで、自分のスクリプト内でそれを私が探しているタイムスタンプと比較すると言っていますか? –

0

私は同様の回答ですが、全く同じ質問ではありませんので、私の情報があなたに役立つかどうかを見てみましょう。

まず、要するにI have been working from this other SO question/answer

は、あなたのオフセットをコミットしたいし、そのための最も一般的な解決策は、ZooKeeperのです。したがって、消費者がエラーに遭遇したり、シャットダウンする必要がある場合、消費者は中断したところから再開することができます。

私は非常に大容量の大量のストリームを扱っています。私の消費者(テスト用)は毎回末尾から開始する必要があります。私の出発点を宣言するためには、KafkaConsumer seekを使用しなければならないことが書類に示されています。

成功して信頼できるものになったら、ここで私の発見を更新しようとします。確かに、これは解決された問題です。

+0

0.9がKafka自体(__consumer_offsetsトピック内)にあるので、オフセットを格納する最も一般的な場所です。 Zookeeperは、古いコンシューマAPIのオフセットにのみ使用しました。 –

関連する問題