2016-05-02 15 views
9

Spring AMQPメッセージリスナーを実行しています。Springでビジネス例外が発生した場合にRabbitMQに再試行する方法非同期MessageListenerの使用例

public class ConsumerService implements MessageListener { 

    @Autowired 
    RabbitTemplate rabbitTemplate; 

    @Override 
    public void onMessage(Message message) { 
     try { 
      testService.process(message); //This process method can throw Business Exception 
     } catch (BusinessException e) { 
      //Here we can just log the exception. How the retry attempt is made? 
     } catch (Exception e) { 
      //Here we can just log the exception. How the retry attempt is made? 
     } 
    } 
} 

ご覧のとおり、処理中に例外が発生する可能性があります。私はキャッチブロックの特定のエラーのため再試行したいです。私はonMessageの例外ではできません。 RabbitMQに例外があり、再試行する方法を教えてください。

+0

私は特定の質問をカバーするために私の答えを編集しました。それ以上の説明が必要な場合はお知らせください。 –

答えて

10

onMessage()はチェック例外をスローすることを許可していないため、RuntimeExceptionに例外をラップして再スローすることができます。

try { 
    testService.process(message); 
} catch (BusinessException e) { 
    throw new RuntimeException(e); 
} 

ただし、メッセージが無期限に再配信される可能性があります。これはどのように動作するのですか:

RabbitMQはメッセージを拒否し、ブローカに再キューイングを要求します。これはhereと表示されます。しかし、RabbitMQはネイティブに、再試行ポリシーのメカニズムを持っていません。最大再試行回数、遅延時間などを設定してください。

Spring AMQPを使用する場合、デフォルトでは「拒否時に再キューイング」が選択されています。 SpringのSimpleMessageListenerContainerは、処理されない例外がある場合、これをデフォルトで実行します。だからあなたの場合は、キャッチされた例外を再スローする必要があります。ただし、メッセージを処理できず、常に例外をスローすると、これは無期限に再配信され、無限ループになります。

メッセージごとにこの動作を無効にするには、AmqpRejectAndDontRequeueException例外をスローします。この場合、メッセージはキューに入れられません。

ます。また、メッセージが拒否され、キューに再登録されていない場合は1がある場合、それはどちらか、DLQに失われるか、または転送されます

container.setDefaultRequeueRejected(false) 

を設定することにより、完全にSimpleMessageListenerContainerの行動を「拒絶の再キューイング」をオフに切り替えることができますRabbitMQで設定します。

最大試行、遅延などで再試行ポリシーが必要な場合は、スレッド内のすべての再試行を(Thread.sleep()を使用して)再試行ごとに拒否することなく実行するスプリング「ステートレス」RetryOperationsInterceptorを設定するのが最も簡単です再試行ごとにRabbitMQに戻ります)。再試行が終了すると、デフォルトで警告が記録され、メッセージが消費されます。 DLQに送信する場合は、RepublishMessageRecovererまたは再入力せずにメッセージを拒否するカスタムMessageRecovererが必要です(後者の場合は、setupキューにRabbitMQ DLQを置く必要があります)。

container.setAdviceChain(new Advice[] { 
     org.springframework.amqp.rabbit.config.RetryInterceptorBuilder 
       .stateless() 
       .maxAttempts(5) 
       .backOffOptions(1000, 2, 5000) 
       .build() 
}); 

これは明らかに、再試行の期間中スレッドを占有するという欠点があります。また、「ステートフル」のRetryOperationsInterceptorを使用して、再試行ごとにRabbitMQにメッセージを送り返すこともできますが、遅延はアプリケーション内でThread.sleep()で実装されます。また、ステートフルなインターセプタを設定するのはもう少し複雑です。

したがって、Threadを使わずに遅延を再試行する場合は、RabbitMQキューのTTLを使用するカスタムソリューションが必要になります。指数バックオフが必要ない場合(遅延が各リトライで増加しないので)、少し簡単です。このようなソリューションを実装するには、基本的に、引数rabbitMQに"x-message-ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>"という別のキューを作成します。メインキューで"x-dead-letter-exchange":"<name of the queue with the TTL>"と設定します。だからあなたがメッセージを拒否して再キューしないとき、RabbitMQはそれを第2のキューにリダイレクトするでしょう。 TTLが期限切れになると、元のキューにリダイレクトされ、アプリケーションに再配信されます。これで、各失敗後にRabbitMQへのメッセージを拒否し、再試行回数を追跡する再試行インターセプタが必要になります。アプリケーションに状態を保持する必要性を回避するには(アプリケーションがクラスタ化されている場合に状態を複製する必要があるため)、RabbitMQが設定するx-deathヘッダーから再試行回数を計算できます。このヘッダの詳細については、hereを参照してください。したがって、この時点でカスタムインターセプタを実装する方が、Springのステートフルなインターセプタをこの動作でカスタマイズするよりも簡単です。

the section about retries in the Spring AMQP referenceもチェックしてください。

+0

あなたの非常に詳しい説明のためのThans Nazreet。私は疑いがもう少しあります。メッセージの再試行のためにランタイム例外を投げなければならないことは明らかです。また、再試行を制限するRetryInterceptorを使用します。 1.ステートフル・インターセプター/ステートレス・インターセプターにはどのような場合がありますか? 2.また、再試行オプションが設定されていると、このすべての再試行が完了するまで、他のメッセージは無人で残されますか? rtを再試行するには別のスレッドにする必要がありますか? 3.再試行とカスタム再試行の実装用に別のキューを作成する方法はありますか? – Santosh

+0

それは助けになってうれしいです。あなたの質問に答えるために:1)ステートフル/ステートレスの再試行インターセプタは一般にSpring(バッチ、統合、amqpなど)で使用されるため、さまざまなユースケースがあります。しかし、AMQPメッセージのコンシューマのケースでは、私が述べたように、ステートフルでは非常に大きなメリットがあるとは思いません。違いは、各リトライ時にステートフルがメッセージをRabbitMQに送り返すことです(ただし、スリープ後、バックオフが設定されている場合)。 –

+0

2)他のメッセージは、同時コンシューマ数によって異なります。各消費者は別々のスレッドです。これはcontainer.setConcurrentConsumers()で設定します。 3)カスタムリトライの実装は、コメントで説明するのにかなり関与しています。私は後で説明する時間を見つけようとします。 –

関連する問題