2017-02-21 10 views
0

私のアプリからウサギブローカーへの最大再試行回数を設定しようとしています。 私はspring amqp rabbit最大コンシューマ接続の再試行

、再試行インターセプタ、

@Bean 
public RetryOperationsInterceptor retryOperationsInterceptor() { 
    return RetryInterceptorBuilder.stateless() 
      .maxAttempts(CommonConstants.MAX_AMQP_RETRIES) 
      .backOffOptions(500, 2.0, 3000) 
      .build(); 
} 

、これがリスナーコンテナ、

container.setAdviceChain(new Advice[]{retryOperationsInterceptor()}); 

内部で使用されているが、しかし、再試行のカップルの後に、消費者が無限ループにもう一度接続を試みてい

2017-02-21 15:03:12.229 WARN 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 
2017-02-21 15:03:12.229 INFO 9292 --- [nsumerThread_92] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0 
2017-02-21 15:03:13.245 WARN 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 
2017-02-21 15:03:13.245 INFO 9292 --- [nsumerThread_93] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer: tags=[{}], channel=null, acknowledgeMode=AUTO local queue size=0 
2017-02-21 15:03:13.261 ERROR 9292 --- [nsumerThread_83] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s). 

MAX_RETRY#制限後にブローカへの接続が不足しているため、アプリケーションのエラーおよびエラーが発生します。ヘルプ

EDIT

ため

おかげで@のアルテム・ビランによって示唆されるように、私は失敗の数をカウントし、onApplicationEventこのクラスではComponent

public class BrokerFailureEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> 

を使用して終了しました適切な処置をとってください。

プロデューサー側の場合は、少し異なるシナリオです。 @ artem-bilanで説明されているように、アプリケーションは問題を処理する必要があります。私はnetflix-hystrixを使用して探査し、生産方法にfallbackメソッドを追加し、そのルートを使用します。もう一度ありがとう。

答えて

1

まあ、あなたはちょっと誤解したcontainer.setAdviceChain(new Advice[]{retryOperationsInterceptor()});です。

プロトコルエラーや接続の切断とは対照的に、ビジネス例外処理は、特にトランザクションやコンテナACKが使用されている場合には、より多くの考えとカスタム設定が必要になる場合があります。 2.8.xより前のバージョンでは、RabbitMQにデッドレターの動作が定義されていなかったため、デフォルトでビジネス例外のために拒否またはロールバックされたメッセージは無限に再配信されます。再配信回数にクライアントを制限するには、リスナーのアドバイスチェーン内のStatefulRetryOperationsInterceptorを選択する必要があります。カスタム・デッド・レター・アクションを実装するリカバリー・コールバックをインターセプターに持たせることができます。それはあなたの特定の環境に適しています。矛盾で

:それは際限なく消費者を再起動しようとしているループ、そして消費者は非常にひどく確かにそれはあきらめます振る舞っている場合にのみ、実際には

。 1つの副作用は、コンテナが起動するときにブローカが停止している場合、接続が確立されるまでブローカが試行され続けることです。あなたが必要なもの

として放出され、ListenerContainerConsumerFailedEventです:だから、あなたはこれらのイベントをリッスンし、いくつかの条件に達したときにアプリケーションを停止することができます

private void logConsumerException(Throwable t) { 
     if (logger.isDebugEnabled() 
       || !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) { 
      logger.warn(
        "Consumer raised exception, processing can restart if the connection factory supports it", 
        t); 
     } 
     else { 
      logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. " 
        + "Exception summary: " + t); 
     } 
     publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t); 
    } 

+0

ok great Artem。私はリスナーを追加し、そこに失敗を数えて、アプリケーションを終了することができました。私はアプリをシャットダウンしない方法があるのだろうかと思っていましたが、AMQP処理を完全に停止しますか?つまり、すべてのコンシューマーまたはリスナーコンテナを停止することができますか?ありがとうございました – user3169330

+0

それは大きな問題ではありません。適切なBeanをcxtから取り出し、 'stop()'を呼び出してください。 –

+0

Artem、ありがとう、今、これはすべて消費者側ですが、rabbitTemplateを使用しているプロデューサでは何が起こりますか?再試行をやめたり、無限の試行を避けたい場合は? – user3169330

関連する問題