2017-12-06 6 views
0

org.springframework.kafka.listener.ConcurrentMessageListenerContainerには、トピックのN個のパーティションをリスンするN MessageListenerがあります。場合によっては、最後にコミットされたオフセットとは異なるオフセットでアプリケーションを開始する必要があります。 私はリスナーをConsumerSeekAwareとし、registerSeekCallback()を使用しました。異なるオフセットにリセットするのは、アプリケーションが起動しているときではなく、起動しているときでなければならないため、registerSeekCallback()というコールバックを保存する必要はありません。 ドキュメントごとに、このregisterSeekCallback()は、リスナーコンテナが開始されたときに自動的に呼び出されます。だから私はregisterSeekCallbackメソッド内でcallback.seek(topic,partition,offset)メソッドを呼び出すつもりです。registerSeekCallback()を使用してSpring Kafkaでシーク操作を行う

質問:リスナーに割り当てられているパーティションを取得するにはどうすればよいですか。 seekメソッドには引数としてtopic、partition、およびoffsetが必要です。実行時にパーティションが割り当てられるため、正しいパーティション番号を取得してシークオペレーションを行う方法がわかりません。

答えて

0

registerSeekCallbackは、実行時(コンテナが開始され、パーティションが割り当てられた後の任意の時点)に任意に検索するために使用されます。

代わりに、あなたにパーティションを与えるConsumerSeekAware.onPartitionsAssigned()でシークする必要があります。

+0

はregisterSeekCallback()がパーティション割り当て前またはその後に呼び出されますか? – user3366706

+0

後で(リスナースレッド上で)検索する方法を与えるために、初期化中に呼び出されます。あなたは 'onPartitionsAssigned()'でシークを行う必要があります。 –

関連する問題