2016-10-14 14 views
2

こんにちは私は、バンプ統合を使用してMQTTメッセージを受信し、処理し、別のトピックに公開しようとしています。ここでSpring Mqtt統合:アウトバウンドトピックの問題

はintegration.xmlです:

<int-mqtt:outbound-channel-adapter id="mqtt-publish" 
    client-id="spring-foo-1" 
    client-factory="clientFactory" 
    auto-startup="true" 
    url="tcp://localhost:1883" 
    default-qos="0" 
    default-retained="true" 
    default-topic="tweets/akki" /> 

    <int-mqtt:message-driven-channel-adapter id="oneTopicAdapter" 
    client-id="spring-foo-2" 
    client-factory="clientFactory" 
    auto-startup="true" 
    url="tcp://localhost:1883" 
    topics="mqtt/publish" 
    /> 

    <int:service-activator input-channel="oneTopicAdapter" method="logMessages" ref="mqttLogger" output-channel="mqtt-publish"></int:service-activator> 

    <bean id="mqttLogger" class="hello.mqttReceiver" /> 

そしてmqttReceiver.java:

  • processed_dataがにルーティングされます。

    package hello; 
    public class mqttReceiver { 
        public String logMessages(String a){ 
         String processed_data = a; //TODO Process Data 
         return processed_data; 
        } 
    } 
    

    以下は、私が直面してる問題ですmqtt /公開ではなく、mqtt/akki

  • processed_dataはものが、何回
  • AbstractMqttMessageHandlerheadersにまず見ていきますので、正しいです

答えて

3

を公表していません。

String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC); 
    Object mqttMessage = this.converter.fromMessage(message, Object.class); 
    if (topic == null && this.defaultTopic == null) { 
     throw new MessageHandlingException(message, 
       "No '" + MqttHeaders.TOPIC + "' header and no default topic defined"); 
    } 

DefaultPahoMessageConverterが移入そのメッセージの到着時にMqttPahoMessageDrivenChannelAdapterからMqttHeaders.TOPICヘッダー。

あなたはまた、(trueに上書き設定)トピックヘッダーを置き換えるために、ヘッダー・enricherを使用することができます<int-mqtt:outbound-channel-adapter>

+1

にメッセージを送信する前に<int:header-filter header-names="mqtt_topic"/>を使用することを検討すべきです。 –