このコードがリスナーコンテナスレッド(onMessage()
または@RabbitListener
)で実行され、コンテナとテンプレートの両方がsetChannelTransacted(true)
の場合、パブリッシュ(および配信)は同じトランザクションで実行されます。例外をスローすると、すべてがロールバックされます。
これは(コンテナのスレッド上で実行されていない)、いくつかの任意のJavaクラスである場合は、メソッドの実行前にトランザクションを開始する必要があります...
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
は、ここで示している完全な春のブートアプリケーションです機能...
@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
Foo foo = context.getBean(Foo.class);
try {
foo.send("foo");
}
catch (Exception e) {}
foo.send("bar");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// should not get any foos...
System.out.println(template.receiveAndConvert("foo", 10_000));
System.out.println(template.receiveAndConvert("bar", 10_000));
// should be null
System.out.println(template.receiveAndConvert("foo", 0));
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
admin.deleteQueue("foo");
admin.deleteQueue("bar");
context.close();
}
@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public Queue foo() {
return new Queue("foo");
}
@Bean
public Queue bar() {
return new Queue("bar");
}
@Bean
public Foo fooBean() {
return new Foo();
}
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
public static class Foo {
@Autowired
private RabbitTemplate template;
@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}
}
}
EDIT消費者側の
取引。これは、一般的に春を使用しているとき、それは、トランザクションを管理しているため適用されますが、ときtxRollback
、この場合には何もしないで直接クライアントを使用して...
Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println(new String(body));
getChannel().txRollback(); // delivery won't be requeued; remains unacked
if (envelope.isRedeliver()) {
getChannel().basicAck(envelope.getDeliveryTag(), false);
getChannel().txCommit(); // commit the ack so the message is removed
getChannel().basicCancel(consumerTag);
latch.countDown();
}
else { // first time, let's requeue
getChannel().basicReject(envelope.getDeliveryTag(), true);
getChannel().txCommit(); // commit the reject so the message will be requeued
}
}
});
latch.await();
channel.close();
connection.close();
注意しません。 ack(またはreject)のみがトランザクショナルです。
Thansk Gary、私は同じことをしましたが、@EnableTransactionManagementが欠けていました。その方法では、私はamqpAdminを使ってキューを宣言していますが(私はそれが良くないことを知っている必要があります)、今はロールバックしていませんが、大したものではありませんが、ロールバックする方法もあります! – user3444718
いいえ。インフラストラクチャの変更はトランザクションには参加しません。rabbitmqトランザクションのセマンティクスについては、[here](https://www.rabbitmq.com/semantics.html)を参照してください。 –
もう一度ありがとう。これは役に立ちます。 – user3444718