2016-11-22 18 views
0

トランザクションで以下のコードを実行することは可能ですか?ビジネス処理で例外がスローされた場合、私たちがキューに送信したメッセージをロールバックできますか?RabbitMQトランザクションでメッセージを送信

rabbitTemplate.convertAndSend("queue1", data); 

//do some processing 

rabbitTemplate.convertAndSend("queue2", data); 

これは、queueqにメッセージを送信した後に何か問題が発生したが、queue2にメッセージを送信できなかった場合に必要です。または、キューにメッセージを送信する際にネットワークやその他の問題を発行する場合はどうしたらよいでしょうか。

答えて

1

このコードがリスナーコンテナスレッド(onMessage()または@RabbitListener)で実行され、コンテナとテンプレートの両方がsetChannelTransacted(true)の場合、パブリッシュ(および配信)は同じトランザクションで実行されます。例外をスローすると、すべてがロールバックされます。

これは(コンテナのスレッド上で実行されていない)、いくつかの任意のJavaクラスである場合は、メソッドの実行前にトランザクションを開始する必要があります...

@Transactional 
    public void send(String in) { 
     this.template.convertAndSend("foo", in); 
     if (in.equals("foo")) { 
      throw new RuntimeException("test"); 
     } 
     this.template.convertAndSend("bar", in); 
    } 

は、ここで示している完全な春のブートアプリケーションです機能...

@SpringBootApplication 
@EnableTransactionManagement 
public class So40749877Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args); 
     Foo foo = context.getBean(Foo.class); 
     try { 
      foo.send("foo"); 
     } 
     catch (Exception e) {} 
     foo.send("bar"); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     // should not get any foos... 
     System.out.println(template.receiveAndConvert("foo", 10_000)); 
     System.out.println(template.receiveAndConvert("bar", 10_000)); 
     // should be null 
     System.out.println(template.receiveAndConvert("foo", 0)); 
     RabbitAdmin admin = context.getBean(RabbitAdmin.class); 
     admin.deleteQueue("foo"); 
     admin.deleteQueue("bar"); 
     context.close(); 
    } 

    @Bean 
    public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setChannelTransacted(true); 
     return rabbitTemplate; 
    } 

    @Bean 
    public Queue foo() { 
     return new Queue("foo"); 
    } 

    @Bean 
    public Queue bar() { 
     return new Queue("bar"); 
    } 

    @Bean 
    public Foo fooBean() { 
     return new Foo(); 
    } 

    @Bean 
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) { 
     return new RabbitTransactionManager(connectionFactory); 
    } 

    public static class Foo { 

     @Autowired 
     private RabbitTemplate template; 

     @Transactional 
     public void send(String in) { 
      this.template.convertAndSend("foo", in); 
      if (in.equals("foo")) { 
       throw new RuntimeException("test"); 
      } 
      this.template.convertAndSend("bar", in); 
     } 

    } 

} 

EDIT消費者側の

取引。これは、一般的に春を使用しているとき、それは、トランザクションを管理しているため適用されますが、ときtxRollback、この場合には何もしないで直接クライアントを使用して...

Connection connection = cf.createConnection(); 
Channel channel = connection.createChannel(true); 
channel.basicQos(1); 
channel.txSelect(); 
CountDownLatch latch = new CountDownLatch(1); 
channel.basicConsume("foo", new DefaultConsumer(channel) { 

    @Override 
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, 
      byte[] body) throws IOException { 
     System.out.println(new String(body)); 

     getChannel().txRollback(); // delivery won't be requeued; remains unacked 

     if (envelope.isRedeliver()) { 
      getChannel().basicAck(envelope.getDeliveryTag(), false); 
      getChannel().txCommit(); // commit the ack so the message is removed 
      getChannel().basicCancel(consumerTag); 
      latch.countDown(); 
     } 
     else { // first time, let's requeue 
      getChannel().basicReject(envelope.getDeliveryTag(), true); 
      getChannel().txCommit(); // commit the reject so the message will be requeued 
     } 
    } 

}); 
latch.await(); 
channel.close(); 
connection.close(); 

注意しません。 ack(またはreject)のみがトランザクショナルです。

+0

Thansk Gary、私は同じことをしましたが、@EnableTransactionManagementが欠けていました。その方法では、私はamqpAdminを使ってキューを宣言していますが(私はそれが良くないことを知っている必要があります)、今はロールバックしていませんが、大したものではありませんが、ロールバックする方法もあります! – user3444718

+0

いいえ。インフラストラクチャの変更はトランザクションには参加しません。rabbitmqトランザクションのセマンティクスについては、[here](https://www.rabbitmq.com/semantics.html)を参照してください。 –

+0

もう一度ありがとう。これは役に立ちます。 – user3444718

関連する問題