ちょっとしたことをするために、私はrabbitMQキューからメッセージを消費したいと思います。今私はウサギ(https://www.rabbitmq.com/mqtt.html)のMQTTのためのプラグインがあることを知っています。pikaを使用したPythonのSparkStreaming、RabbitMQ、MQTT
しかし、私はスパークがpikaから生産されたメッセージを消費する例を作ることはできません。
たとえば私は、私は次のようにメッセージプロデューサーを見ることができるかどうかを確認するために、ここで簡単なwordcount.pyプログラム(https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html)を使用しています:
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
sparkstreaming 消費者があります次
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""
brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
しかし、単純な語数の例とは異なり、私はこれは、次のエラーを仕事と取得することができません。
16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)
だから私の質問は、キューに、そこに任意のより充実した一例であり、どのようにこれらのマップのRabbitMQのそれらの上かどうかを聞くためMQTTUtils.createStream(ssc, brokerUrl, topic)
の観点での設定がどうあるべきか、です。
私は私の消費者のコードを実行しています:
:としてurl_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
とスパークストリーミング:1件のコメントにより示唆されるようにTCPパラメータを次のように私はプロデューサーのコードを更新した./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
ありがとうございます。私は見てみましょう。これは直接的でよく話題になりますか? – disruptive
MQTTプラグインを設定することができます(https://www.rabbitmq.com/mqtt.html#config)。異なる交換を使用することができますが、私はこれを知ることができます。とにかくMQTTプロトコルはそれほど豊富ではありません。 – zero323
ドッカーなしでこれを設定する方法はありますか?たとえば、.configファイルを使用します。私はhttps://www.rabbitmq.com/mqtt.htmlのデフォルト設定で試しました。しかし、これはまったく機能しません。 => INFOPORT ==== 5-Jul-2016 :: 11:52:08 === MQTT接続を受け入れる<0.321.0>(127.0.0.1:47868 - > 127.0)。 0.1:1883)。しかし、生成されたメッセージをこのポートにマップする方法はありますか? – disruptive