2017-01-25 13 views
0

私は手動でコンシューマアプリケーションのオフセットを認識しますが、私はコンシューマがunacknowledgeメッセージを自動取得することができますが、私はそれを成功させることはできません。 どのように春の雲ストリームカフカ消費者の自動再消費の未確認のメッセージですか?

cloud: 
    stream: 
    kafka: 
     binder: 
     brokers: ****:**** 
     za-nodes: **** 
     replication-factor: 1 
     bindings: 
     input: 
     consumer: 
     auto-commit-offset: false 
     auto-commit-on-error: false 
     reset-offsets: true 

答えて

0

今使用していないようですresetOffsetsプロパティ:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/48#issuecomment-273111200
これは私の設定YMLです。デフォルトでは

は、消費者のauto.offset.resetlatestに設定されます(つまり、明示的spring.cloud.stream.bindings.input.groupを使用して結合、消費者のためのグループを設定していないん場合)カフカは、個々のメッセージを確認する能力を持っていません

+0

私はグループを設定しました。アプリケーションを再起動しても問題ありません。 しかし、私はアプリケーションを再起動せずに未確認のメッセージを取得したいと思っています。 – gemorn

+0

「最新」から開始する場合は、グループを設定する必要はありません。また、マニュアルの場合は、次の例を参照してください。http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.BUILD-SNAPSHOT/reference/htmlsingle/index.html#_example_setting_literal_autocommitoffset_literal_false_and_relying_on_manual_acking –

0

- グループ/パーティションのオフセットが保存されると、以前のすべてのメッセージは「確認済み」と見なされます。手動承認で許可されるのは、たとえば、メッセージが非同期に処理される状況でオフセットコミットプロセスを延期することです(自動確認によってオフセットメッセージが実際に処理される前に)。

resetOffsetsは現在サポートされていません(https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/67を参照してください)。残念ながら、ドキュメントにはこれが反映されていません。

関連する問題