org.springframework.kafka.listener.ConcurrentMessageListenerContainer
には、トピックのN個のパーティションをリスンするN MessageListenerがあります。場合によっては、最後にコミットされたオフセットとは異なるオフセットでアプリケーションを開始する必要があります。 私はリスナーをConsumerSeekAware
とし、registerSeekCallback()
を使用しました。異なるオフセットにリセットするのは、アプリケーションが起動しているときではなく、起動しているときでなければならないため、registerSeekCallback()
というコールバックを保存する必要はありません。 ドキュメントごとに、このregisterSeekCallback()
は、リスナーコンテナが開始されたときに自動的に呼び出されます。だから私はregisterSeekCallback
メソッド内でcallback.seek(topic,partition,offset)
メソッドを呼び出すつもりです。registerSeekCallback()を使用してSpring Kafkaでシーク操作を行う
質問:リスナーに割り当てられているパーティションを取得するにはどうすればよいですか。 seekメソッドには引数としてtopic、partition、およびoffsetが必要です。実行時にパーティションが割り当てられるため、正しいパーティション番号を取得してシークオペレーションを行う方法がわかりません。
はregisterSeekCallback()がパーティション割り当て前またはその後に呼び出されますか? – user3366706
後で(リスナースレッド上で)検索する方法を与えるために、初期化中に呼び出されます。あなたは 'onPartitionsAssigned()'でシークを行う必要があります。 –