2017-02-02 4 views
1

キューからすべてのメッセージを待ちたい&消費者を停止します。しかし、それは新しいメッセージを待っています。キューが空のときだけメッセージを待つ引数なし既存のメッセージをすべてキューから取得するには

public class Listener { 

    private Channel channel; 

    public Listener(boolean futureActionsAllowed) { 
     this.futureActionsAllowed = futureActionsAllowed; 
     channel = RabbitMQ.getChannel(TasksStatusOpts.QUEUE_NAME, TasksStatusOpts.DURABLE, TasksStatusOpts.EXCLUSIVE, 
       TasksStatusOpts.AUTO_DELETE); 
    } 

    public void getAndExit() throws IOException, InterruptedException { 
     QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 
     String consumerTag = channel.basicConsume(TasksStatusOpts.QUEUE_NAME, TasksStatusOpts.AUTO_ACK, 
       queueingConsumer); 
     while (true) { 
      QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); 
      if (delivery == null) { 
       channel.basicCancel(consumerTag); 
       Connection conn = channel.getConnection(); 
       channel.close(); 
       conn.close(); 
       break; 
      } else { 
       String message = new String(delivery.getBody(), Charsets.UTF_8); 
       handle(message, delivery.getEnvelope()); 
      } 
     } 
    } 

    public static void main(String[] args) throws IOException { 
     new Listener(false).getAndExit(); 
    } 

} 
+0

現在のキューカウントを取得するAPI呼び出しがあるかどうかを確認できます。その場合は、アプリケーションを起動するときに、キューの数を読み取り、ループを何回実行してからアプリケーションを停止することができます。またはメッセージ配信時間を取得するAPIがある場合は、アプリケーション開始時間後に配信されるメッセージの取得を開始するときにアプリケーションを停止することができます –

+0

キューからすべてのメッセージを消費して終了する必要がある場合は、キュー内のメッセージの数に依存します。消費者が生産よりも前に開始した場合(わずかな時間でさえ)、この方法は失敗します。プロデューサがそれらの間に少しの遅延を持つメッセージをパブリッシュすると、コンシューマはそれらを十分に読み込みますが、プロデューサがまだ動作しているのにキューは空の状態になります。したがって、メッセージ数ではなくタイムアウトを使用してください。 –

答えて

1

nextDelivery():私はすべての不要なコードを削除しました。それは決してnullを返さないでしょう。代わりに、タイムアウトのあるnextDelivery(timeout)を探している可能性があります。時間が経過すると、メソッドはnullを返します。

また、このメソッドは非推奨であることに注意してください。最新の例については、RabbitMQ Java API Guideを参照してください。

関連する問題