2012-01-26 9 views
18

サブスクライバーとパブリッシャーの2つのプログラムがあります。 サブスクライバーはトピックにメッセージを置くことができ、メッセージは正常に送信されます。 ブラウザでactivemqサーバをチェックすると、1つのmsgがエンキューされています。ACTIVEMQ-パブリッシャー加入者hello worldの例

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class producer { 

    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; 

    public static void main(String[] args) throws JMSException { 

     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 

     // JMS messages are sent and received using a Session. We will 
     // create here a non-transactional session object. If you want 
     // to use transactions you should set the first parameter to 'true' 
     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     Topic topic = session.createTopic("testt"); 

     MessageProducer producer = session.createProducer(topic); 

     // We will send a small text message saying 'Hello' 

     TextMessage message = session.createTextMessage(); 

     message.setText("HELLO JMS WORLD"); 
     // Here we are sending the message! 
     producer.send(message); 
     System.out.println("Sent message '" + message.getText() + "'"); 

     connection.close(); 
    } 
} 

私はこのコードを実行した後、コンソールに出力されている:

26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect 
INFO: Successfully connected to tcp://localhost:61616 
Sent message 'HELLO JMS WORLD' 

しかし、私は消費者のコードを実行すると、それはここでメッセージ

を受信して​​いない生産者コードがあります消費者コードは次のとおりです。

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class consumer { 
    // URL of the JMS server 
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; 

    // Name of the topic from which we will receive messages from = " testt" 

    public static void main(String[] args) throws JMSException { 
     // Getting JMS connection from the server 

     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 
     Connection connection = connectionFactory.createConnection(); 
     connection.start(); 

     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     Topic topic = session.createTopic("testt"); 

     MessageConsumer consumer = session.createConsumer(topic); 

     MessageListener listner = new MessageListener() { 
      public void onMessage(Message message) { 
       try { 
        if (message instanceof TextMessage) { 
         TextMessage textMessage = (TextMessage) message; 
         System.out.println("Received message" 
           + textMessage.getText() + "'"); 
        } 
       } catch (JMSException e) { 
        System.out.println("Caught:" + e); 
        e.printStackTrace(); 
       } 
      } 
     }; 

     consumer.setMessageListener(listner); 
     connection.close(); 

    } 
}  

このコードを実行すると、何も表示されません。 誰かがこの問題を解決するために私を助けることができますか?

+1

接続が早すぎると思われます。あなたの消費者が消費を開始する前に、接続は閉じられており、メインの方法は終了です! –

答えて

13

あなたの問題は、コンシューマが動作していてすぐにシャットダウンすることです。

consumer.setMessageListener(listner); 

    try { 
     System.in.read(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 

    connection.close(); 

あなたが停止する前にキーを打つまでは待機します:あなたの消費者にこれを追加すること

してみてください。考慮すべき

他のもの:

  • 最後に近い
  • Javaの命名規則は、ほかに(クラスの最初の文字
+0

あなたも上記のコードは動作していません:( 助けてください...プロジェクトにひどく立ち往生! –

11

のための主な問題を、大文字を使用して奨励するためにブロックを使用アプリがすぐに終了する)は、あなたがトピックに送信していることです。トピックはメッセージを保持しないため、コンシューマを生成して実行するアプリケーションを実行すると、メッセージが送信された時点でトピックにサブスクリプションされていないため、コンシューマは何も受信しません。シャットダウンの問題を修正してから、ある端末でコンシューマを実行してからプロデューサを実行すると、コンシューマが受信したメッセージが表示されます。メッセージの保持が必要な場合は、誰かがメッセージを消費するまでメッセージを保持するキューを使用する必要があります。

+3

申し訳ありませんが、バグが修正されました。 私は最初にサブスクライバモジュールを実行していました... しかしそれは最初にサブスクライバモジュールである必要がありますその後、出版社... 提案に感謝:) –

2

ほんの一部:キューではないトピックと

  • 作品。トピック内のメッセージは、コンシューマーが利用できないときに破棄され、永続化されません。
  • メッセージリスナーを設定した後にconnection.start()を追加します。すべてのコンシューマ/プロデューサが正しく設定されているときに接続を開始する必要があります。
  • 接続をもう一度閉じる前にしばらくお待ちください。

このトピックは、おそらく最も重要な障害の原因になります。

2

プロデューサークラスが正しいです。それは円滑に実行されます。

ただし、お客様の消費者は間違っています&あなたはそれを変更する必要があります。

  • まず、接続オブジェクトを作成した後でsetClientID( "any_string_value")を加えます。

    例えば:第二

  • Connection connection = connectionFactory.createConnection(); // need to setClientID value, any string value you wish connection.setClientID("12345");、トピックを介してメッセージを送信する代わりcreateConsumer()ののcreateDurableSubscriber()メソッドを使用します。ここで

    MessageConsumer consumer = session.createDurableSubscriber(topic,"SUB1234");

修正comsumerクラスです:今

package mq.test; 

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 

public class consumer { 
    // URL of the JMS server 
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; 

    // Name of the topic from which we will receive messages from = " testt" 

    public static void main(String[] args) throws JMSException { 
     // Getting JMS connection from the server 

     ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 
     Connection connection = connectionFactory.createConnection(); 

     // need to setClientID value, any string value you wish 
     connection.setClientID("12345"); 

     try{ 
     connection.start(); 
     }catch(Exception e){ 
      System.err.println("NOT CONNECTED!!!"); 
     } 
     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     Topic topic = session.createTopic("test_data"); 

     //need to use createDurableSubscriber() method instead of createConsumer() for topic 
     // MessageConsumer consumer = session.createConsumer(topic); 
     MessageConsumer consumer = session.createDurableSubscriber(topic, 
       "SUB1234"); 

     MessageListener listner = new MessageListener() { 
      public void onMessage(Message message) { 
       try { 
        if (message instanceof TextMessage) { 
         TextMessage textMessage = (TextMessage) message; 
         System.out.println("Received message" 
           + textMessage.getText() + "'"); 
        } 
       } catch (JMSException e) { 
        System.out.println("Caught:" + e); 
        e.printStackTrace(); 
       } 
      } 
     }; 

     consumer.setMessageListener(listner); 
     //connection.close(); 

    } 
} 

、あなたのコードは正常に実行されます。