2016-04-05 22 views
0

今私はカフカキューに数百のメッセージを書き込む機能を持っています。しかし、これらのメッセージがすべて消費された場合、私は追加の機能も実行する必要があります。それが空になったときに通知を受けるためにリスナーをカフカのキューに置く方法はありますか?カフカキューが空であることを確認してください

+0

空になりましたか?すべてのメッセージが消費された場合、キューが空になるわけではありません。 – avr

+0

キューに特別なメッセージを入れることができます。 – nha

答えて

3

あなたは、この2つの方法で解決することができ、私は思う:

  1. カフカのFetch Responseは、本質的にパーティション内の最後のメッセージのオフセットされHighwaterMarkOffsetが含まれています。あなたはあなたのメッセージがそのオフセットを持っているかどうかを確認することができます。もしそうなら、あなたは終わりに達しました。ただし、プロデューサとコンシューマが同時に作業している場合、これは機能しません。コンシューマはメッセージをより早く消費し、必要な時より早く停止することができます。
  2. 「poison pill」メッセージを送信する - 100メッセージを生成する必要があるとします。そして、あなたのプロデューサーは、これらの100個のメッセージ+ 1個の特別なメッセージ(例えば、いくつかのUUIDをあなたのロジックの通常の状況下では決して現れないようにします)を「終わり」を意味します。消費者側では、受信したメッセージが毒薬であるかどうかをチェックし、そうであればシャットダウンします。
関連する問題