2

私はSpring AMQPを使用するSpringブートアプリケーションを持っています。 AMQPの設定は次のようになりますメッセージが正常に確認されたときを確実に知る方法はありますか?

@Configuration 
public class AmqpConfig { 
    @Bean 
    DirectExchange directExchange() { return new DirectExchange("amq.direct"); } 

    @Bean 
    Queue testQueue() { return QueueBuilder.durable("test").build(); } 

    @Bean 
    Binding testBinding(Queue testQueue, DirectExchange directExchange) { 
    return BindingBuilder.bind(testQueue).to(directExchange).with("test.routing.key"); 
    } 

    @Bean 
    SimpleRabbitListenerContainerFactory manualContainerFactory(ConnectionFactory connectionFactory, 
                    SimpleRabbitListenerContainerFactoryConfigurer configurer) { 
    SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); 
    configurer.configure(containerFactory, connectionFactory); 
    containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
    return containerFactory; 
    } 
} 

私はMANUALの確認応答を使用しています。

@Slf4j 
@Component 
public class ManualListener { 
    @RabbitListener(queues = "test", containerFactory = "manualContainerFactory") 
    public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException { 
    try { 
     log.info("Message received"); 
     Thread.sleep(20000); 
     channel.basicAck(tag, false); 
     log.info("Message processed"); 
    } catch (Exception e) { 
     log.error("Something went wrong: {}", message, e); 
     channel.basicNack(tag, false, false); 
    } 
    } 
} 

Thread.sleep(20000)は、いくつかの時間のかかるプロセスをシミュレートするためにここにいるのリスナーがあります。私のテストケースは次のとおりです。

  1. はRabbitMQのを再起動し、20秒睡眠中
  2. 上のリスナーにメッセージを送信します。これは、効果的にキュー

上のすべてのチャネルを終了だから、私はこのケースで起こることを期待することは、私はそれに応じて行動することができるように、チャネルが閉じていることを例外をスロー(前のアクションまたは類似を元に戻す)するchannel.basicAckです。実際に何が起こるかは、basicAckはすべてのように終了し、CachingConnectionFactoryは、配信のためにPRECONDITION_FAILEDのバックグラウンドで例外を記録するだけです。

2017-12-15 11:00:47.627 INFO 39397 --- [cTaskExecutor-1] .ManualListener : Message processed 
2017-12-15 11:00:47.628 ERROR 39397 --- [ 127.0.0.1:5672] nnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80) 

私の質問は:メッセージが正常に認識されたときに知って@RabbitListenerのための確実な方法はありますか?

春ブーツ:1.5.2.RELEASE、春ウサギ:1.7.1.RELEASE

EDIT:

私はゲイリー・ラッセルによって提案された((ChannelProxy) channel).getTargetChannel()で解決策を試してみました。それは問題を緩和するだけのように見えますが、まだ間違っていたメッセージがあります。私はこのテストを作成しました。

@Slf4j 
@Component 
public class ManualListener { 
    static int counter = 0; 

    @RabbitListener(queues = "test", containerFactory = "manualContainerFactory") 
    public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException { 
     log.info("Message received with delivery tag {} and redelivered {}", message.getMessageProperties().getDeliveryTag(), message.getMessageProperties().getRedelivered()); 
     if (!message.getMessageProperties().getRedelivered()) { 
     new Thread(() -> { 
      try { 
      channel.getConnection().close(); 
      log.info("Connection closed"); 
      } catch (Exception e) { 
      log.error("Connection closed with timeout", e); 
      } 
     }).start(); 
     } 
     new Thread(() -> { 
     Channel actualChannel = ((ChannelProxy) channel).getTargetChannel(); 
     try { 
      actualChannel.basicAck(tag, false); 
      log.info("Number of acknowledged messages: {}", ++counter); 
     } catch (Exception e) { 
      log.error("Something went wrong: {}", message, e); 
     } 
     }).start(); 
    } 
} 

私がログに見ることができることである。

2018-01-10 13:17:34.133 INFO 17250 --- [cTaskExecutor-1] .ManualListener : Message received with delivery tag 1 and redelivered false 
2018-01-10 13:17:34.137 INFO 17250 --- [  Thread-27] .ManualListener : Number of acknowledged messages: 1 
2018-01-10 13:17:34.163 INFO 17250 --- [  Thread-26] .ManualListener : Connection closed 
2018-01-10 13:17:35.162 INFO 17250 --- [cTaskExecutor-2] .ManualListener : Message received with delivery tag 1 and redelivered true 
2018-01-10 13:17:35.162 INFO 17250 --- [  Thread-28] .ManualListener : Number of acknowledged messages: 2 

だから私は二度同じメッセージをACKさのテストで。この場合には、basicAckが呼び出されたときにIOExceptionを受け取ると予想されます。

この問題は、私たちの生産環境で発生します。アプリケーションは大量のメッセージを処理し、非常に頻繁に発生します。接続が切断されるたびに、すでに確認されたメッセージが2つ受信されます。

答えて

2

チャネルプロキシは、閉鎖されたことが検出されたときに基礎となるrabbitmqチャネルを回復(リフレッシュ)します。

isOpen()に電話をかけることはできますが、その電話とお客様のbasicAck()の間にはまだ小さな競争状態があります。あなたはまた、シャットダウンのリスナーを追加することができ

EDIT

...

channel.addShutdownListener(s -> { 
    System.out.println(s); 
}); 
channel.basicAck(tag, false); 

EDIT

ここで信頼性の回避策はあります...

@RabbitListener(queues = "foo") 
public void foo(Message m, @Header(AmqpHeaders.CHANNEL) Channel channel, 
     @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws Exception { 
    System.in.read(); 
    Channel actualChannel = ((ChannelProxy) channel).getTargetChannel(); 
    try { 
     actualChannel.basicAck(tag, false); 
    } 
    catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - Closed via management plugin, class-id=0, method-id=0) 
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) 
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) 
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416) 
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1164) 
    at com.example.So47454769Application.foo(So47454769Application.java:42) 
+0

私は信頼性の回避策を見つけました。私の編集を参照してください。 –

+0

私は上記の解決策を試しました。まだ間違っていたメッセージがいくつかあります。私は追加情報で私の質問を編集しました。 – lolotron

+0

これは[マスターで修正 - AMQP-793](https://jira.spring.io/browse/AMQP-793)です。修正を含めるために2.0.2リリースをスケジュールする必要があります。 2.0.2.BUILD-SNAPSHOTを試して、必要であれば確認することができます。 –

関連する問題