2016-05-21 5 views
1

qosについて少し混乱しています。Qosについて読むのは qosを2に設定すると、ブローカー/クライアントは4段階で正確にメッセージを配信しますハンドシェーク。qos 2で公開しているパブリッシャー

したがってqos 2は、メッセージがブローカーに公開されており、サブスクライバ(クライアント)は受信していないことを確認します。 または メッセージ受信確認のために、加入者 または

によって受信され、我々は出版社のようなアプリケーションを構築する必要があるのは、例えばトピックにメッセージを公開します「DATA」と例えば、トピックにサブスクライブします「ACK」と加入者の必要性トピック「ACK」の承認を発行するために、そのメッセージがトピック「DATA」

上で受信された私は、QoSで公開しようとした次のコードで 出版社

をサブスクライブするために発行データのJavaクラスと別のクラスを作成しました2とdeliveryComplete関数には例外がありますときにgetMessage()私はqosを試したときに0 getMessage()は何も例外を与えませんでした。以下

public class PublishMe implements MqttCallback{ 
    MqttClient myClient; 
    MqttClient myClientPublish; 
    MqttConnectOptions connOpt; 
    MqttConnectOptions connOptPublish; 
    static final String BROKER_URL = "tcp://Ehydromet-PC:1883"; 

    static Boolean msgACK=false;  
    public static void main(String[] args) { 
     PublishMe smc = new PublishMe(); 
     smc.runClient(); 
    } 
    @Override 
    public void connectionLost(Throwable t) { 
     System.out.println("Connection lost!"); 
    } 

     @Override 
     public void messageArrived(String string, MqttMessage message) throws Exception { 
       System.out.println("-------------------------------------------------"); 
     System.out.println("| Topic:" + string); 
     System.out.println("| Message: " + new String(message.getPayload())); 
     System.out.println("-------------------------------------------------"); 

     } 
/** 
    * 
    * deliveryComplete 
    * This callback is invoked when a message published by this client 
    * is successfully received by the broker. 
    * 
    * @param token 
    */ 
    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     try{ 
     System.out.println("Message delivered successfully to topic : \"" + token.getMessage().toString() + "\"."); 
     }catch(Exception ex){ 
     System.out.println(ex.getCause()+" -- "+ex.getLocalizedMessage()+" -- "+ex.getMessage()+" -- ");  
       } 

     } 

    public void runClient() { 
     connOpt = new MqttConnectOptions(); 
     connOpt.setCleanSession(false); 
     connOpt.setKeepAliveInterval(0); 

       connOptPublish= new MqttConnectOptions(); 
     connOptPublish.setCleanSession(false); 
     connOptPublish.setKeepAliveInterval(0); 

// Connect to Broker 
     try { 
      myClient = new MqttClient(BROKER_URL, "pahomqttpublish11"); 
      myClient.setCallback(this); 
      myClient.connect(connOpt); 

         myClientPublish= new MqttClient(BROKER_URL, "pahomqttpublish42"); 
      myClientPublish.setCallback(this); 
      myClientPublish.connect(connOptPublish); 

     } catch (MqttException e) { 
      e.printStackTrace(); 
      System.exit(-1); 
     } 

     System.out.println("Connected to " + BROKER_URL); 

     String myTopic = "sample"; 
//    String myTopic = "receiveDATA2"; 
       MqttTopic topic = myClientPublish.getTopic(myTopic); 

     // publish messages if publisher 
     if (publisher) { 

        int i=1; 
      while(true){ 
           String pubMsg = "sample msg "+i; 

       MqttMessage message = new MqttMessage(pubMsg.getBytes()); 
           System.out.println(message); 
           message.setQos(2); 
           message.setRetained(false); 

           // Publish the message 
           MqttDeliveryToken token = null; 
           try { 
        // publish message to broker 
        token = topic.publish(message); 
        // Wait until the message has been delivered to the broker 
        token.waitForCompletion(); 
             msgACK=false; 
        Thread.sleep(100); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      }   
     } 
    } 


} 

とは私が出版社のコードで実装する必要がどうあるべきか、その加入者がメッセージを受信した保証することができますどのように加入者

public class Mqttsample implements MqttCallback{ 
    MqttClient myClient; 
    MqttClient myClientPublish; 
    MqttConnectOptions connOpt; 
MqttConnectOptions connOptPublish; 
    static final String BROKER_URL = "tcp://Ehydromet-PC:1883"; 
    // the following two flags control whether this example is a publisher, a subscriber or both 
    static final Boolean subscriber = true; 
    static final Boolean publisher = true; 
     public static void main(String[] args) { 


     Mqttsample smc = new Mqttsample(); 
     smc.runClient(); 
    } 
     @Override 
    public void connectionLost(Throwable t) { 
     System.out.println("Connection lost!"); 
     // code to reconnect to the broker would go here if desired 
    } 

     @Override 
     public void messageArrived(String string, MqttMessage message) throws Exception { 
     //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. 
       System.out.println("| Topic:" + string+"| Message: " + new String(message.getPayload())); 

     } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     try{ 
      System.out.println("Pub complete" + new String(token.getMessage().getPayload())); 
     } 
     catch(Exception ex){ 
      System.out.println("delivery Error "+ex.getMessage()); 
     } 
     } 



    public void runClient() { 
     connOpt = new MqttConnectOptions(); 
     connOpt.setCleanSession(false); 
     connOpt.setKeepAliveInterval(0); 

       connOptPublish= new MqttConnectOptions(); 
     connOptPublish.setCleanSession(false); 
     connOptPublish.setKeepAliveInterval(0); 

// Connect to Broker 
     try { 
      myClient = new MqttClient(BROKER_URL, "pahomqttpublish"); 
      myClient.setCallback(this); 
      myClient.connect(connOpt); 

         myClientPublish= new MqttClient(BROKER_URL, "pahomqttsubscribe"); 
      myClientPublish.setCallback(this); 
      myClientPublish.connect(connOptPublish); 

     } catch (MqttException e) { 
      e.printStackTrace(); 
      System.exit(-1); 
     } 

     System.out.println("Connected to " + BROKER_URL); 


     // subscribe to topic if subscriber 
     if (subscriber) { 
      try { 
          //String myTopicACK = M2MIO_DOMAIN + "/" + "ACK" + "/" + M2MIO_THING; 
          String myTopicACK = "sample"; 
          // MqttTopic topicACK = myClient.getTopic(myTopicACK); 
       int subQoS = 2; 

       myClient.subscribe(myTopicACK, subQoS); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
//     


    } 


} 

です。上記のリンクから

http://www.eclipse.org/paho/files/mqttdoc/Cclient/qos.html 正確に一度

QoS2、:メッセージは常に正確に一度配信されます。メッセージは、送信者がメッセージが受信者によって公開されたという確認を受信するまで、送信者にローカルに保存する必要があります。メッセージは、メッセージを再度送信する必要がある場合に備えて格納されます。 QoS2は最も安全ですが、最も遅い転送モードです。

+0

また、実際に何が起こったのか推測されないように、例外に例外を含めてください。 – hardillb

答えて

1

QOSレベルが高いほど、クライアント(パブリッシャまたはサブスクライバ)とブローカ間のメッセージの配信が記述され、パブリッシャからサブスクライバへのメッセージの配信は終了しません。

これは、パブ/サブプロトコルとしてトピックに存在する可能性のあるサブスクライバ数を知る方法がないため、これは非常に慎重です。 0とnの間に任意の数を入れることができます。パブリッシャーとサブスクライバーは、異なるQOSレベルでトピックとやり取りすることもできます(パブリッシャーはQOS 2でパブリッシュでき、サブスクライバーはQOS 0でサブスクライブできます)。メッセージは、保持されたメッセージとして公開することもできます。そのため、最後に保持されたメッセージは、常に新しく加入しているクライアントに配信されます。

QOS契約を満たすために、クライアント上のすべてのストレージは、あなたが(この場合はPAHO)を使用しているMQTTライブラリーで処理する必要があります

deliveryCompleteコールバックは、パブリッシャがへのメッセージの送信を完了したことを示すだけですブローカー。また、docは、あなたが言及した例外を説明するメッセージが配信された場合にはtoken.getMessage()がnullを返すと言います(私はあなたが例外を含んでいないので推測しなければなりません)。

あなたのアプリケーションアーキテクチャでは、メッセージの了解の終了が本当に必要な場合は、記述した内容に似た何かを実装する必要があります。しかし、正常に動作することを確認するには、メッセージのペイロードにメッセージIDを含める必要があります。確認メッセージには、これを含める必要があります。私がこのようなものを使用する唯一の理由は、メッセージを承認するための時間要件がある場合です。時間が重要でない場合は、Persistent Sessionsを参照して、発行時に切断された場合に再接続するときに、購読しているクライアントにメッセージが確実に配信されるようにしてください。

+0

qos 1と2でnullを返すtoken.getMessage()例外をご迷惑をおかけして申し訳ありません。メッセージを配信済みとしてマークする必要があります。あなたの答えから、私は出版社と加入者が独立しており、ブローカーがクライアントのメッセージを世話すると思います。細部をありがとう、私は多くのドキュメントを読んで、私は混乱していた – Rawat

関連する問題