2017-08-01 1 views
0

RabbitMQで新しくなっています。私はこれらの目的のためにMOMシステムが必要です:RabbitMQでのトランザクション消費

  1. ロジックが実行されて が正常に実行されるまで、公開されたメッセージが消費されます。
  2. 論理が正常に実行されるまで、ブローカーは公開されたメッセージを キューから削除する必要はありません。これらの目標のために

、私は最初の試みで、次のコードを書いた:このアプローチにより、

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    final Channel channel = connection.createChannel(); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    boolean autoAck = false; 
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag", 
      new DefaultConsumer(channel) 
      { 
       @Override 
       public void handleDelivery(String consumerTag, 
              Envelope envelope, 
              AMQP.BasicProperties properties, 
              byte[] body)throws IOException 
       { 
        try 
        { 
         channel.txSelect(); 
         String routingKey = envelope.getRoutingKey(); 
         String contentType = properties.getContentType(); 
         long deliveryTag = envelope.getDeliveryTag(); 
         System.out.println("Recieve Message is :" + new String(body)); 
         int reslt = //execute my logic 
         if(result == 0) 
          channel.txCommit(); 
         else 
          channel.txRollback(); 
        } 
        catch(Throwable t) 
        { 
         t.printStackTrace(); 
        } 
       } 
      }); 

、私は2つ目の目的を達成するため、他の言葉で、ブローカーは、私のメッセージを削除しませんが、キュー内のすべてのメッセージが消費され、それらのすべてがロールバックされると、ブローカは再び消費者にメッセージを送信しません。私は両方の目標を達成し、私は私のコードが正しいかどうかわからない、このソリューションにより、

ConnectionFactory factory = new ConnectionFactory(); 
    factory.setHost("localhost"); 
    Connection connection = factory.newConnection(); 
    final Channel channel = connection.createChannel(); 
    channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
    boolean autoAck = false; 
    channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag", 
      new DefaultConsumer(channel) 
      { 
       @Override 
       public void handleDelivery(String consumerTag, 
              Envelope envelope, 
              AMQP.BasicProperties properties, 
              byte[] body)throws IOException 
       { 
        try 
        { 
         String routingKey = envelope.getRoutingKey(); 
         String contentType = properties.getContentType(); 
         long deliveryTag = envelope.getDeliveryTag(); 
         System.out.println("Recieve Message is :" + new String(body)); 
         int reslt = //execute my logic 
         if(result == 0) 
          channel.basicAck(deliveryTag, false); 
         else 
          channel.basicNack(deliveryTag,false,true); 
        } 
        catch(Throwable t) 
        { 
         t.printStackTrace(); 
        } 
       } 
      }); 

:2回目の試行で

は、私は次のコードを書きましたの?高TPSのプロダクション環境でこのアプローチが問題を引き起こしますか?私は フラグが のbasicNackのメソッドが重いか軽いのか分からないのですか?

+2

私はかなり長い間basicAckとbasicNackを使用していますが、TPSが高い場合でもまだ問題はありません。 –

答えて

0

私は同じ要件を数ヶ月後に持っていました。私は多くのソリューションを使いました。

私のロジックがうまくいけば、私は手動でメッセージを承認するか、そのメッセージを拒否すると、メモリのどこかに配送タグが保存されました。私はこの目的のために以下の方法を使用しました。

if (success) 
connectionModel.BasicAck(Convert.ToUInt64(uTag), false); 
else 
connectionModel.BasicReject(Convert.ToUInt64(uTag), true); 

上記のコードは、私の生産で5000msg /秒で正常に動作しています。まあ、basicNackメソッドが私に問題を引き起こしていました。

関連する問題