2016-05-06 5 views
3

キューから特定のログを消費し、サードパーティのシステムが正しく応答しない場合に備えて、そのログからいくつかのサードパーティAPIを取得して使用するケースがあります。再試行を実装したいその特定のログのロジック。カフカコンシューマのロジックを再試行する

時間フィールドを追加して同じキューにメッセージを再表示することができます。このメッセージは、その時間フィールドが有効な場合、つまり現在の時間よりも短い場合に再び消費され、それ以外の場合は再びキューにプッシュされます。

しかし、このロジックは、再試行時間が正しく、キューが不必要に大きくなるまで同じログを繰り返し追加します。

カフカでリトライロジックを実装する方が良いですか?

+0

メッセージNを送信するときにサードパーティのAPIが応答しない場合は、メッセージN + 1、N + 2などで続行し、後でメッセージNに戻るのは意味がありますか?もしそうでなければ、再試行でカフカの助けになる点はないようです。ちょうどあなたの消費者に数秒、数分、時間を戻して、メッセージNをもう一度押してください。 – Harald

+2

はい私は最初に私はいくつかの似たようなアプローチを考えました。ここでは、失敗した場合の透過的オフセットに消費者を求めます。しかし、消費者がN時間単位でメッセージを消費するのを制限する方法/方法はありますか? – TECH007

答えて

0

コンシューマでは、例外をスローした場合、試行番号1の別のメッセージを生成します。次に消費されるときは、試行番号1のプロパティを持ちます。プロデューサで処理します。再試行回数を入力し、生成を停止します。

+0

あなたは再試行を遅らせる方法を知っていますか?例えば。再び生産しますが、5分後に消費しますか?消費率が高い場合、10回の再試行が数ミリ秒で終了する可能性があるためです。 –

+0

私はこの記事に出会った。これが助けてくれるといいですね。https://stackoverflow.com/questions/46665941/apache-kafka-consumer-delay-option – ksv

+0

私はこの投稿に出会いました。 https://stackoverflow.com/questions/46665941/apache-kafka-consumer-delay-optionに役立つことを願っています。現在のタイムスタンプでプロパティを送信することもできます。 5分を超える場合は現在の時間と比較し、処理してください。それ以外の場合は同じイベントを生成します。これにより、不要なチェックが行われます。 Thread.sleepは受け入れられるものです。しかし、私はスレッドメカニズムについてはわかりません。スリープ状態のスレッドが多数生成されることがあります。使用前に調べてください – ksv

0

いくつかの再試行トピックを作成し、そこで失敗したタスクをプッシュすることができます。たとえば、3つのトピックを異なる遅延で分で作成し、1つの失敗したタスクを最大試行の限界に達するまで回転させることができます。

は、よりを参照してください1時間で再試行のために - 5分

'retry_30m_topic' での再試行のために - - 30分

で再試行のための 'retry_1h_topic' 'retry_5m_topic'

詳細:https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a

0

はい、これは私が思っていたストレートな解決策でしたの。しかし、これで、メッセージ処理が再び失敗する可能性があるため、多くのトピックを作成することになります。

私は、このユースケースをRabbit MQにマッピングすることでこの問題を解決しました。ウサギMQでは、リトライ交換という概念があります。メッセージ処理が交換から失敗した場合、それをTTLで再試行交換に送ることができます。 TTLが期限切れになると、メッセージはメインエクスチェンジに戻り、再度処理できる状態になります。

私は、Rabbit MQを使用して指数バックオフメッセージ処理を実装する方法を説明するいくつかの例を掲載することができます。

関連する問題