0
RabbitMQで新しくなっています。私はこれらの目的のためにMOMシステムが必要です:RabbitMQでのトランザクション消費
- ロジックが実行されて が正常に実行されるまで、公開されたメッセージが消費されます。
- 論理が正常に実行されるまで、ブローカーは公開されたメッセージを キューから削除する必要はありません。これらの目標のために
、私は最初の試みで、次のコードを書いた:このアプローチにより、
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
channel.txSelect();
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic
if(result == 0)
channel.txCommit();
else
channel.txRollback();
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});
、私は2つ目の目的を達成するため、他の言葉で、ブローカーは、私のメッセージを削除しませんが、キュー内のすべてのメッセージが消費され、それらのすべてがロールバックされると、ブローカは再び消費者にメッセージを送信しません。私は両方の目標を達成し、私は私のコードが正しいかどうかわからない、このソリューションにより、
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic
if(result == 0)
channel.basicAck(deliveryTag, false);
else
channel.basicNack(deliveryTag,false,true);
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});
:2回目の試行で
は、私は次のコードを書きましたの?高TPSのプロダクション環境でこのアプローチが問題を引き起こしますか?私は のフラグが のbasicNackのメソッドが重いか軽いのか分からないのですか?
私はかなり長い間basicAckとbasicNackを使用していますが、TPSが高い場合でもまだ問題はありません。 –