onMessage()
はチェック例外をスローすることを許可していないため、RuntimeException
に例外をラップして再スローすることができます。
try {
testService.process(message);
} catch (BusinessException e) {
throw new RuntimeException(e);
}
ただし、メッセージが無期限に再配信される可能性があります。これはどのように動作するのですか:
RabbitMQはメッセージを拒否し、ブローカに再キューイングを要求します。これはhereと表示されます。しかし、RabbitMQはネイティブに、再試行ポリシーのメカニズムを持っていません。最大再試行回数、遅延時間などを設定してください。
Spring AMQPを使用する場合、デフォルトでは「拒否時に再キューイング」が選択されています。 SpringのSimpleMessageListenerContainer
は、処理されない例外がある場合、これをデフォルトで実行します。だからあなたの場合は、キャッチされた例外を再スローする必要があります。ただし、メッセージを処理できず、常に例外をスローすると、これは無期限に再配信され、無限ループになります。
メッセージごとにこの動作を無効にするには、AmqpRejectAndDontRequeueException
例外をスローします。この場合、メッセージはキューに入れられません。
ます。また、メッセージが拒否され、キューに再登録されていない場合は1がある場合、それはどちらか、DLQに失われるか、または転送されます
container.setDefaultRequeueRejected(false)
を設定することにより、完全にSimpleMessageListenerContainer
の行動を「拒絶の再キューイング」をオフに切り替えることができますRabbitMQで設定します。
最大試行、遅延などで再試行ポリシーが必要な場合は、スレッド内のすべての再試行を(Thread.sleep()
を使用して)再試行ごとに拒否することなく実行するスプリング「ステートレス」RetryOperationsInterceptor
を設定するのが最も簡単です再試行ごとにRabbitMQに戻ります)。再試行が終了すると、デフォルトで警告が記録され、メッセージが消費されます。 DLQに送信する場合は、RepublishMessageRecoverer
または再入力せずにメッセージを拒否するカスタムMessageRecoverer
が必要です(後者の場合は、setupキューにRabbitMQ DLQを置く必要があります)。
container.setAdviceChain(new Advice[] {
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2, 5000)
.build()
});
これは明らかに、再試行の期間中スレッドを占有するという欠点があります。また、「ステートフル」のRetryOperationsInterceptor
を使用して、再試行ごとにRabbitMQにメッセージを送り返すこともできますが、遅延はアプリケーション内でThread.sleep()
で実装されます。また、ステートフルなインターセプタを設定するのはもう少し複雑です。
したがって、Thread
を使わずに遅延を再試行する場合は、RabbitMQキューのTTLを使用するカスタムソリューションが必要になります。指数バックオフが必要ない場合(遅延が各リトライで増加しないので)、少し簡単です。このようなソリューションを実装するには、基本的に、引数rabbitMQに"x-message-ttl": <delay time in milliseconds>
と"x-dead-letter-exchange":"<name of the original queue>"
という別のキューを作成します。メインキューで"x-dead-letter-exchange":"<name of the queue with the TTL>"
と設定します。だからあなたがメッセージを拒否して再キューしないとき、RabbitMQはそれを第2のキューにリダイレクトするでしょう。 TTLが期限切れになると、元のキューにリダイレクトされ、アプリケーションに再配信されます。これで、各失敗後にRabbitMQへのメッセージを拒否し、再試行回数を追跡する再試行インターセプタが必要になります。アプリケーションに状態を保持する必要性を回避するには(アプリケーションがクラスタ化されている場合に状態を複製する必要があるため)、RabbitMQが設定するx-death
ヘッダーから再試行回数を計算できます。このヘッダの詳細については、hereを参照してください。したがって、この時点でカスタムインターセプタを実装する方が、Springのステートフルなインターセプタをこの動作でカスタマイズするよりも簡単です。
the section about retries in the Spring AMQP referenceもチェックしてください。
私は特定の質問をカバーするために私の答えを編集しました。それ以上の説明が必要な場合はお知らせください。 –