2017-12-19 13 views
0

私はインターネットを通して文字通り一貫してコーディングしています。ブローカーベースのオフセットを使用して消費者のラグをプログラム的に取得

私が欲しいのは、特定のgroupIDに対して、コンシューマグループのラグメトリック(または、2つのオフセットが派生したもの、現在のものと最後のもの)です。

私はkafka 0.9+を使用しています(飼い主ではオフセットがないので)、必要に応じて新しいものを使用することもできます。

スクリプトを使用しないで、プログラムでを取得する方法が必要です。

誰か、どうしたらいいですか?

+0

あなたは消費者の投票ループの中から好きですか?または何? –

+0

もしあなたがすでに試してみたことをお詫びして申し訳ありませんが、 'consumer.subscribe()'の後に 'consumer.position()'と 'consumer.endOffsets()'の組み合わせは使えませんか? –

+0

あなたのコメントをお寄せいただきありがとうございます、私はおそらく仕事を提案しましたが、個々の消費者を個別に照会する必要はありません。グループの情報を取得したいのですが、そこから問題にならないはずです... Kafkaの人は、これに –

答えて

1

AdminClientを通じて消費者グループの詳細を取得する方法を追加する作業が現在進行中です。https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74686265を参照してください。

その間に、kafka-consumer-groupツールのロジックを再利用することをお勧めします。このScalaクラスは、https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scalaをサポートしており、グループ内のすべてのインスタンスの詳細を取得するビットを抽出するのは非常に難しいことではありません。

+0

私は、このクラスを理解できなかったし、私のenvにインポートして実行してもらえませんでした... –

+0

これを試しましたが、私が見つけたのは、消費者の多いクラスタがあれば...実行し、ブローカー上のリソースのトンを占有....それは30秒ごとにすべての遅れをプログラム的にプルアップしたい場合、非常にうまくスケールされていないと思われる – user2061886

+0

消費者もラグメトリック、http://kafka.apacheを放出することに注意してください。 org/documentation /#new_consumer_fetch_monitoring。実際にやりたいことに応じて、JMXを通じて各インスタンスを照会するか、カスタムメトリックレポーターを使用して別のシステムに値を送信することもできます。 –

関連する問題