私はSpring AMQPを使用するSpringブートアプリケーションを持っています。 AMQPの設定は次のようになりますメッセージが正常に確認されたときを確実に知る方法はありますか?
@Configuration
public class AmqpConfig {
@Bean
DirectExchange directExchange() { return new DirectExchange("amq.direct"); }
@Bean
Queue testQueue() { return QueueBuilder.durable("test").build(); }
@Bean
Binding testBinding(Queue testQueue, DirectExchange directExchange) {
return BindingBuilder.bind(testQueue).to(directExchange).with("test.routing.key");
}
@Bean
SimpleRabbitListenerContainerFactory manualContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
configurer.configure(containerFactory, connectionFactory);
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return containerFactory;
}
}
私はMANUALの確認応答を使用しています。
@Slf4j
@Component
public class ManualListener {
@RabbitListener(queues = "test", containerFactory = "manualContainerFactory")
public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException {
try {
log.info("Message received");
Thread.sleep(20000);
channel.basicAck(tag, false);
log.info("Message processed");
} catch (Exception e) {
log.error("Something went wrong: {}", message, e);
channel.basicNack(tag, false, false);
}
}
}
Thread.sleep(20000)
は、いくつかの時間のかかるプロセスをシミュレートするためにここにいるのリスナーがあります。私のテストケースは次のとおりです。
- はRabbitMQのを再起動し、20秒睡眠中
- 上のリスナーにメッセージを送信します。これは、効果的にキュー
上のすべてのチャネルを終了だから、私はこのケースで起こることを期待することは、私はそれに応じて行動することができるように、チャネルが閉じていることを例外をスロー(前のアクションまたは類似を元に戻す)するchannel.basicAck
です。実際に何が起こるかは、basicAck
はすべてのように終了し、CachingConnectionFactory
は、配信のためにPRECONDITION_FAILEDのバックグラウンドで例外を記録するだけです。
2017-12-15 11:00:47.627 INFO 39397 --- [cTaskExecutor-1] .ManualListener : Message processed
2017-12-15 11:00:47.628 ERROR 39397 --- [ 127.0.0.1:5672] nnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
私の質問は:メッセージが正常に認識されたときに知って@RabbitListener
のための確実な方法はありますか?
春ブーツ:1.5.2.RELEASE、春ウサギ:1.7.1.RELEASE
EDIT:
私はゲイリー・ラッセルによって提案された((ChannelProxy) channel).getTargetChannel()
で解決策を試してみました。それは問題を緩和するだけのように見えますが、まだ間違っていたメッセージがあります。私はこのテストを作成しました。
@Slf4j
@Component
public class ManualListener {
static int counter = 0;
@RabbitListener(queues = "test", containerFactory = "manualContainerFactory")
public void processMsg(Message message, Channel channel, @Header(DELIVERY_TAG) long tag) throws IOException {
log.info("Message received with delivery tag {} and redelivered {}", message.getMessageProperties().getDeliveryTag(), message.getMessageProperties().getRedelivered());
if (!message.getMessageProperties().getRedelivered()) {
new Thread(() -> {
try {
channel.getConnection().close();
log.info("Connection closed");
} catch (Exception e) {
log.error("Connection closed with timeout", e);
}
}).start();
}
new Thread(() -> {
Channel actualChannel = ((ChannelProxy) channel).getTargetChannel();
try {
actualChannel.basicAck(tag, false);
log.info("Number of acknowledged messages: {}", ++counter);
} catch (Exception e) {
log.error("Something went wrong: {}", message, e);
}
}).start();
}
}
私がログに見ることができることである。
2018-01-10 13:17:34.133 INFO 17250 --- [cTaskExecutor-1] .ManualListener : Message received with delivery tag 1 and redelivered false
2018-01-10 13:17:34.137 INFO 17250 --- [ Thread-27] .ManualListener : Number of acknowledged messages: 1
2018-01-10 13:17:34.163 INFO 17250 --- [ Thread-26] .ManualListener : Connection closed
2018-01-10 13:17:35.162 INFO 17250 --- [cTaskExecutor-2] .ManualListener : Message received with delivery tag 1 and redelivered true
2018-01-10 13:17:35.162 INFO 17250 --- [ Thread-28] .ManualListener : Number of acknowledged messages: 2
だから私は二度同じメッセージをACKさのテストで。この場合には、basicAck
が呼び出されたときにIOException
を受け取ると予想されます。
この問題は、私たちの生産環境で発生します。アプリケーションは大量のメッセージを処理し、非常に頻繁に発生します。接続が切断されるたびに、すでに確認されたメッセージが2つ受信されます。
私は信頼性の回避策を見つけました。私の編集を参照してください。 –
私は上記の解決策を試しました。まだ間違っていたメッセージがいくつかあります。私は追加情報で私の質問を編集しました。 – lolotron
これは[マスターで修正 - AMQP-793](https://jira.spring.io/browse/AMQP-793)です。修正を含めるために2.0.2リリースをスケジュールする必要があります。 2.0.2.BUILD-SNAPSHOTを試して、必要であれば確認することができます。 –