2016-10-26 3 views
3

私たちはKafkaとZooKeeperでStormを使用しています。いくつかのトピックを削除して別の名前で再作成する必要がある状況がありました。私たちのカフカの噴出口は、今から別の新しい話題の名前を読んでいたのと同じです。しかし、今やスパウトは、新しいトピックからの読み込みを試みるときに、古いトピックパーティションからのオフセットを使用しています。したがって、トピック名のパーティション0の末尾は500になりますが、オフセットは10000のようになります。カフカのオフセットをリセットして尾の位置に合わせるにはどうすればいいですか?

トピックの末尾に一致するようにオフセット位置をリセットする方法はありますか?

+0

あなたのkafkaオフセットはZookeeperのznodeに保存されています。だからあなたは動物園のクリスを介してそれらをリセットすることができます、彼らは/消費者の下にすべきです。あなたはkafkaメタデータAPIを使用してそれらを読むことができます。私は、実際に組み込みオフセットメカニズムをオーバーライドしてHBaseにオフセットを格納し、ストーリーの履歴オフセットにマルチバージョンを使用するプロジェクトに取り組んできました。基本的にオフセットのトランザクション履歴があり、各コンシューマグループ/トピック/パーティションの組み合わせごとにオフセットのバージョンを削除することでロールバックできます。 – richardstartin

+0

@dschこれは、カフカの0.9バージョン前にのみ当てはまります。 0.9のオフセットはKafkaのトピック '__consumer_offsets'に格納されているので、 –

+0

興味深い。 Mirrormakerを介したオフセット複製を改善するための変更でしたか? – richardstartin

答えて

5

複数のオプションがあります(StormのKafkaSpoutは開始オフセットを定義するAPIを提供していないため)。

  1. ログの尾から消費者にしたい場合は、削除すべき古いオフセット
    • あなたに依存カフカバージョン
      • (前0.9)あなたは少しあるZKを(操作することができますトリッキー)
      • (0.9+)またはあなた試みるもトリッキーであり、他を削除する可能性があるトピック__consumer_offsets(からのオフセットあなたも、保存したいオフセット削除しない)
    • 何のオフセットが存在しない場合、あなたは自動であなたの注ぎ口を再起動することができ、あなたは、(私が推薦する)の代替として
  2. (あなたにカフカのバージョンに依存)ポリシー「最新」または「最大」をリセットするオフセットを使用して必要な方法でオフセットを操作し、commit()オフセットを使用する小さなクライアントアプリケーションを作成できます。このクライアントは、KafkaSpoutと同じグループIDを使用する必要があり、同じトピックを購読する必要があります。さらに、このクライアントアプリケーションが単一のコンシューマグループメンバーを実行していることを確認して、すべてのパーティションを割り当てます。このため
    • 、あなたのいずれかによって(「最新」または「最大」の設定をリセットするオフセットログの終わりまでシークし、
    • をコミットするか、(のような-1)オフセット無効をコミットし、自動に依存していますあなたカフカ版)カフカのストリームの

に、コミットされたオフセットを操作するために同様のことを行う「アプリケーションのリセットツール」があります。

- :あなたには、いくつかの詳細を取得したい場合は、 http://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

(それにもかかわらず、根本的なオフセット操作のアイデアは同じです、私はポストの作者だと、それはおよそカフカストリームである免責事項)このブログの記事を読むことができます

関連する問題