複数のオプションがあります(StormのKafkaSpout
は開始オフセットを定義するAPIを提供していないため)。
- ログの尾から消費者にしたい場合は、削除すべき古いオフセット
- あなたに依存カフカバージョン
- (前0.9)あなたは少しあるZKを(操作することができますトリッキー)
- (0.9+)またはあなた試みるもトリッキーであり、他を削除する可能性があるトピック
__consumer_offsets
(からのオフセットあなたも、保存したいオフセット削除しない)
- 何のオフセットが存在しない場合、あなたは自動であなたの注ぎ口を再起動することができ、あなたは、(私が推薦する)の代替として
- (あなたにカフカのバージョンに依存)ポリシー「最新」または「最大」をリセットするオフセットを使用して必要な方法でオフセットを操作し、
commit()
オフセットを使用する小さなクライアントアプリケーションを作成できます。このクライアントは、KafkaSpout
と同じグループIDを使用する必要があり、同じトピックを購読する必要があります。さらに、このクライアントアプリケーションが単一のコンシューマグループメンバーを実行していることを確認して、すべてのパーティションを割り当てます。このため
- 、あなたのいずれかによって(「最新」または「最大」の設定をリセットするオフセットログの終わりまでシークし、
- をコミットするか、(のような-1)オフセット無効をコミットし、自動に依存していますあなたカフカ版)カフカのストリームの
に、コミットされたオフセットを操作するために同様のことを行う「アプリケーションのリセットツール」があります。
- :あなたには、いくつかの詳細を取得したい場合は、
http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
(それにもかかわらず、根本的なオフセット操作のアイデアは同じです、私はポストの作者だと、それはおよそカフカストリームである免責事項)このブログの記事を読むことができます
あなたのkafkaオフセットはZookeeperのznodeに保存されています。だからあなたは動物園のクリスを介してそれらをリセットすることができます、彼らは/消費者の下にすべきです。あなたはkafkaメタデータAPIを使用してそれらを読むことができます。私は、実際に組み込みオフセットメカニズムをオーバーライドしてHBaseにオフセットを格納し、ストーリーの履歴オフセットにマルチバージョンを使用するプロジェクトに取り組んできました。基本的にオフセットのトランザクション履歴があり、各コンシューマグループ/トピック/パーティションの組み合わせごとにオフセットのバージョンを削除することでロールバックできます。 – richardstartin
@dschこれは、カフカの0.9バージョン前にのみ当てはまります。 0.9のオフセットはKafkaのトピック '__consumer_offsets'に格納されているので、 –
興味深い。 Mirrormakerを介したオフセット複製を改善するための変更でしたか? – richardstartin