2016-06-16 4 views
14

ちょっとしたことをするために、私はrabbitMQキューからメッセージを消費したいと思います。今私はウサギ(https://www.rabbitmq.com/mqtt.html)のMQTTのためのプラグインがあることを知っています。pikaを使用したPythonのSparkStreaming、RabbitMQ、MQTT

しかし、私はスパークがpikaから生産されたメッセージを消費する例を作ることはできません。

たとえば私は、私は次のようにメッセージプロデューサーを見ることができるかどうかを確認するために、ここで簡単なwordcount.pyプログラム(https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html)を使用しています:

import sys 
import pika 
import json 
import future 
import pprofile 

def sendJson(json): 

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    channel = connection.channel() 

    channel.queue_declare(queue='analytics', durable=True) 
    channel.queue_bind(exchange='analytics_exchange', 
         queue='analytics') 

    channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json) 
    connection.close() 

if __name__ == "__main__": 
    with open(sys.argv[1],'r') as json_file: 
    sendJson(json_file.read()) 

sparkstreaming 消費者があります次

import sys 
import operator 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.mqtt import MQTTUtils 

sc = SparkContext(appName="SS") 
sc.setLogLevel("ERROR") 
ssc = StreamingContext(sc, 1) 
ssc.checkpoint("checkpoint") 
#ssc.setLogLevel("ERROR") 


#RabbitMQ 

"""EXCHANGE = 'analytics_exchange' 
EXCHANGE_TYPE = 'direct' 
QUEUE = 'analytics' 
ROUTING_KEY = 'analytics' 
RESPONSE_ROUTING_KEY = 'analytics-response' 
""" 


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883" 
topic = "analytics" 

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) 
#dummy functions - nothing interesting... 
words = mqttStream.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word, 1)) 
wordCounts = pairs.reduceByKey(lambda x, y: x + y) 

wordCounts.pprint() 
ssc.start() 
ssc.awaitTermination() 

しかし、単純な語数の例とは異なり、私はこれは、次のエラーを仕事と取得することができません。

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8) 
java.lang.NullPointerException 
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457) 
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273) 

だから私の質問は、キューに、そこに任意のより充実した一例であり、どのようにこれらのマップのRabbitMQのそれらの上かどうかを聞くためMQTTUtils.createStream(ssc, brokerUrl, topic)の観点での設定がどうあるべきか、です。

私は私の消費者のコードを実行しています:

:として

url_location = 'tcp://localhost' 
url = os.environ.get('', url_location) 
params = pika.URLParameters(url) 
connection = pika.BlockingConnection(params) 

とスパークストリーミング:1件のコメントにより示唆されるようにTCPパラメータを次のように私はプロデューサーのコードを更新した./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

brokerUrl = "tcp://127.0.0.1:5672" 
topic = "#" #all messages 

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) 
records = mqttStream.flatMap(lambda line: json.loads(line)) 
count = records.map(lambda rec: len(rec)) 
total = count.reduce(lambda a, b: a + b) 
total.pprint() 

答えて

2

間違ったポート番号を使用しているように見えます。それを仮定:

  • を使用すると、デフォルトの設定で実行しているのRabbitMQのローカルインスタンスを持っていて、MQTTプラグイン(rabbitmq-plugins enable rabbitmq_mqtt)を有効にし、どちらか(​​/pysparkを実行packagesjarsとするときのRabbitMQサーバー
  • spark-streaming-mqttが含まれて再起動しました/ driver-class-path

tcp://localhost:1883でTCPを使用して接続できます。また、MQTTがamq.topicを使用していることを覚えておく必要があります。

クイックスタート

  • 次の内容のDockerfileを作成します。

    FROM rabbitmq:3-management 
    
    RUN rabbitmq-plugins enable rabbitmq_mqtt 
    
  • ドッカーイメージ構築:

    docker build -t rabbit_mqtt . 
    
  • 開始画像を、サーバがレアになるまで待ちますDY:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    
  • 次の内容のproducer.pyを作成します。

    import pika 
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost')) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='amq.topic', 
           type='topic', durable=True) 
    
    for i in range(1000): 
        channel.basic_publish(
         exchange='amq.topic', # amq.topic as exchange 
         routing_key='hello', # Routing key used by producer 
         body='Hello World {0}'.format(i) 
        ) 
        time.sleep(3) 
    
    connection.close() 
    
  • 開始プロデューサー

    python producer.py 
    

    とメッセージが受信されることを確認するために管理コンソールhttp://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    をご覧ください。

  • 次の内容のconsumer.pyを作成します。

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    from pyspark.streaming.mqtt import MQTTUtils 
    
    sc = SparkContext() 
    ssc = StreamingContext(sc, 10) 
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883", # Note both port number and protocol 
        "hello"     # The same routing key as used by producer 
    ) 
    mqttStream.count().pprint() 
    ssc.start() 
    ssc.awaitTermination() 
    ssc.stop() 
    
  • ダウンロードの依存関係(スパークやスパークバージョンを構築するために使用されるものにScalaのバージョンを調整する):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 
    
  • SPARK_HOMEPYTHONPATHことを確認してください正しいディレクトリをポイントします。

  • は、(以前のようにバージョンを調整)でconsumer.pyを提出:

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py 
    

あなたはすべてのステップあなたがスパークログでのHello Worldメッセージが表示されるはずですが従った場合。

+0

ありがとうございます。私は見てみましょう。これは直接的でよく話題になりますか? – disruptive

+0

MQTTプラグインを設定することができます(https://www.rabbitmq.com/mqtt.html#config)。異なる交換を使用することができますが、私はこれを知ることができます。とにかくMQTTプロトコルはそれほど豊富ではありません。 – zero323

+0

ドッカーなしでこれを設定する方法はありますか?たとえば、.configファイルを使用します。私はhttps://www.rabbitmq.com/mqtt.htmlのデフォルト設定で試しました。しかし、これはまったく機能しません。 => INFOPORT ==== 5-Jul-2016 :: 11:52:08 === MQTT接続を受け入れる<0.321.0>(127.0.0.1:47868 - > 127.0)。 0.1:1883)。しかし、生成されたメッセージをこのポートにマップする方法はありますか? – disruptive

2

MqttAsyncClient Javadocから、サーバーURIは次のスキームのいずれかを持つ必要があります。tcp://ssl://、またはlocal://です。上記のスキームのいずれかを使用するには、上記のbrokerUrlを変更する必要があります。詳細については

は、ここにMqttAsyncClientのソースへのリンクです:

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272

+1

httpの代わりにtcpを使用するようにプロデューサを変更しようとしましたが、次の接続に関する問題が発生しました。エラーReceiverSupervisorImpl:エラーのある受信者が停止しました:接続が失われました(32109) - java.net.SocketException:接続リセット – disruptive

関連する問題