2017-05-15 8 views
0

Ubuntuサーバーでは、私はDivolte Collectorを設定してウェブサイトからクリックストリームデータを収集しました。データは、「divolte-data」というカフカチャンネルに書き込まれています。カフカの消費者を設定することで、私はデータが入って来見ることができます:Druid-Tranquility(Supersetの場合)を使用してdivolte-data Kafkaチャンネルを読むには?

V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer 
LinuxCanonical Ltd. 

それから私は(スパークを読むことができる)druid.ioを含む一般的なデータベースには、いくつかのコネクタを備えていAirbnbスーパーセットでデータを視覚化したいと思います。

Divolteはカフカのデータを構造化されていない方法で保存しているようです。しかし、明らかにそれは構造化された方法でデータをマップすることができます。 JSONで入力データを構造化する必要がありますか?

そして、Druid-Tranquilityからどのようにデータを受信したのですか?私はconfの例でチャンネル名を変更しようとしましたが、このコンシューマーはゼロのメッセージを受け取ります。

答えて

0

私が見つけた解決策は、Kafka PythonライブラリやConfluent Kafka PythonなどのPythonでKafkaメッセージを処理できることです。その後、Avroリーダーでメッセージをデコードします。

編集:ここで私はそれをやった方法についての更新は、次のとおりです。

私はアブロライブラリはアブロファイルを読むためだけだと思ったが、それは実際には次のように、デコードカフカメッセージの問題を解決:私が最初にライブラリをインポートスキーマファイルをパラメータとして与えて、コンシューマループで使用できる辞書にメッセージをデコードする関数を作成します。

from confluent_kafka import Consumer, KafkaError 
from avro.io import DatumReader, BinaryDecoder 
import avro.schema 

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read()) 
reader = DatumReader(schema) 

def decode(msg_value): 
    message_bytes = io.BytesIO(msg_value) 
    decoder = BinaryDecoder(message_bytes) 
    event_dict = reader.read(decoder) 
    return event_dict 

c = Consumer() 
c.subscribe(topic) 
running = True 
while running: 
    msg = c.poll() 
    if not msg.error(): 
     msg_value = msg.value() 
     event_dict = decode(msg_value) 
     print(event_dict) 
    elif msg.error().code() != KafkaError._PARTITION_EOF: 
     print(msg.error()) 
     running = False 
関連する問題