2016-12-28 6 views
0

私はこのトピックに新たなんです。私は、JMSリスナーを使用して、スプリットメッセージを含むアクティブなMQを聴いています。私は最後のメッセージまで待ち行列を聞いて、それをUIにまとめて送る必要があります。私はキューを聞くことができ、メッセージをつかむことができますが、分割メッセージがどれだけ利用できるかわからないので、一緒に送信することはできません。上記の操作を行うリスナーを作る方法はありますか?キュー内で利用可能なメッセージがなくなった場合、jmsリスナーはnull値を生成しますか?どんなアイデアや助けが本当に役に立つでしょう。ActiveMQのJMSListener

私は、JMSリスナーを使用して、キューに耳を傾けるために以下のコードを使用しています。

private static final String ORDER_RESPONSE_QUEUE = "mail-response-queue"; 

@JmsListener(destination = ORDER_RESPONSE_QUEUE) 
public void receiveMessage(final Message<InventoryResponse> message) throws JMSException { 
    LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
    MessageHeaders headers = message.getHeaders(); 
    LOG.info("Application : headers received : {}", headers); 

    InventoryResponse response = message.getPayload(); 
    LOG.info("Application : response received : {}",response); 
    LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
} 

JMSリスナーを使用してキュー情報を取得できますか?

答えて

0

jmxでは、宛先に関する情報にアクセスできます。たとえば、メッセージがキュー内でどのように保留中であるかを知ることができます。新しいメッセージが

長い org.apache.activemq.broker.jmx.DestinationViewMBean.getQueueSizeを送信された場合、これは変更することができます

注()

@MBeanInfo(値= "ナンバー宛先にメッセージがまだありませんが、まだ送信されていますが、未確認です。 ")

まだ宛先がではないメッセージの数を返しますは、戻り値を消費する:何もメッセージはあなたが表示されていない受信があった場合にメッセージを消費しないのはなぜ

import java.util.HashMap; 
import java.util.Map; 

import javax.management.MBeanServerConnection; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.ObjectName; 
import javax.management.remote.JMXConnector; 
import javax.management.remote.JMXConnectorFactory; 
import javax.management.remote.JMXServiceURL; 

import org.apache.activemq.broker.jmx.BrokerViewMBean; 
import org.apache.activemq.broker.jmx.QueueViewMBean; 

public class JMXGetDestinationInfos { 

    public static void main(String[] args) throws Exception { 
     JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://host:1099/jmxrmi"); 
     Map<String, String[]> env = new HashMap<>(); 
     String[] creds = {"admin", "activemq"}; 
     env.put(JMXConnector.CREDENTIALS, creds); 
     JMXConnector jmxc = JMXConnectorFactory.connect(url, env); 
     MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

     ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); 

     BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, 
       true); 
     for (ObjectName name : mbean.getQueues()) { 
      if (("Destination".equals(name.getKeyProperty("destinationName")))) { 
       QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name, 
         QueueViewMBean.class, true); 
       System.out.println(queueMbean.getQueueSize()); 
      } 
     } 
    } 
} 

を消費することがまだあり、この先 内のメッセージの数を返します?メッセージが受信されない場合、タイムアウト後にnullを返すメソッドがあります。

ActiveMQMessageConsumer.receive(長いタイムアウト) はたJMSExceptionは、指定されたタイムアウト間隔内に到着する次のメッセージを受信しスロー。この呼び出しブロックメッセージが到着するまで 、タイムアウトが期限切れになるか、このメッセージコンシューマが 閉じられています。タイムアウトがゼロになると、コールは無期限にブロックされます( )。定義:インタフェースMessageConsumer内のreceive パラメータ:timeout - タイムアウト値(ミリ秒単位)。タイムアウト値は期限切れになりません。タイムアウトが期限切れになるか、このメッセージは コンシューマが同時に閉じられる

UPDATEこのようにしてもよい場合は、この メッセージコンシューマ用に生成された次のメッセージ、またはnull:

import java.io.IOException; 
import java.net.MalformedURLException; 
import java.util.HashMap; 
import java.util.Map; 

import javax.management.MBeanServerConnection; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.MalformedObjectNameException; 
import javax.management.ObjectName; 
import javax.management.remote.JMXConnector; 
import javax.management.remote.JMXConnectorFactory; 
import javax.management.remote.JMXServiceURL; 

import org.apache.activemq.broker.jmx.BrokerViewMBean; 
import org.apache.activemq.broker.jmx.QueueViewMBean; 

public class JMXGetDestinationInfos { 

    private QueueViewMBean queueMbean; 

    { 
     try { 
      JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://host:1099/jmxrmi"); 
      Map<String, String[]> env = new HashMap<>(); 
      String[] creds = { "admin", "activemq" }; 
      env.put(JMXConnector.CREDENTIALS, creds); 
      JMXConnector jmxc = JMXConnectorFactory.connect(url, env); 
      MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

      ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); 

      BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, 
        true); 
      for (ObjectName name : mbean.getQueues()) { 
       if (("Destination".equals(name.getKeyProperty("destinationName")))) { 
        queueMbean = MBeanServerInvocationHandler.newProxyInstance(conn, name, QueueViewMBean.class, true); 
        System.out.println(queueMbean.getQueueSize()); 
        break; 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    @JmsListener(destination = ORDER_RESPONSE_QUEUE) 
    public void receiveMessage(final Message<InventoryResponse> message, javax.jms.Message amqMessage) throws JMSException { 
     LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
     MessageHeaders headers = message.getHeaders(); 
     LOG.info("Application : headers received : {}", headers); 

     InventoryResponse response = message.getPayload(); 
     LOG.info("Application : response received : {}",response); 
     LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); 
     //queueMbean.getQueueSize() is real time, each call return the real size 
     ((org.apache.activemq.command.ActiveMQMessage) amqMessage).acknowledge(); 
     if(queueMbean != null && queueMbean.getQueueSize() == 0){ 
      //display messages ?? 
     } 
    } 
} 
戻ります

getQueueSize()は、 宛先のメッセージの数をye消費されることはありません。 潜在的に派遣したが 未確認。サイズ== 0(サイズ== 0がすべてのメッセージがディスパッチされ、認知されることを意味している場合

一つの解決策は、あなたの春DefaultMessageListenerContainer.sessionAcknowledgeModeNameでセッションを作成するためorg.apache.activemq.ActiveMQSession.INDIVIDUAL_ACKNOWLEDGEへの場合、acknowledgeModeを更新し、個別に各メッセージを確認し、その後にチェックすることです)。

+0

ありがとうございます。消費者ではなくリスナーを使用していますが、JMSリスナーを使用してキュー情報を取得することは可能ですか? – user6543599

+0

投稿を更新しました。それを見て助けてください – user6543599