2016-04-22 15 views
1

私は、Python 2.7とApache Avro(Pythonクライアント)を使用して、kafkaブローカーを通してシリアル化されたメッセージを交換しようとしています。以前にスキーマを作成せずにメッセージをやりとりする方法があるかどうかを知りたい。Python - スキーマレスApache Avroデータのシリアル化

これはコード(スキーマを使用して、sensor.avsc、私は避けたいもの)である:

from kafka import SimpleProducer, KafkaClient 
import avro.schema 
import io, random 
from avro.io import DatumWriter 

# To send messages synchronously 
kafka = KafkaClient('localhost:9092') 
producer = SimpleProducer(kafka, async = False) 

# Kafka topic 
topic = "sensor_network_01" 

# Path to user.avsc avro schema that i don't want 
schema_path="sensor.avsc" 
schema = avro.schema.parse(open(schema_path).read()) 


for i in xrange(100): 
    writer = avro.io.DatumWriter(schema) 
    bytes_writer = io.BytesIO() 
    encoder = avro.io.BinaryEncoder(bytes_writer) 
    # creation of random data 
    writer.write({"sensor_network_name": "Sensor_1", "value": random.randint(0,10), "threshold_value":10 }, encoder) 

    raw_bytes = bytes_writer.getvalue() 
    producer.send_messages(topic, raw_bytes) 

これはsensor.avscファイルです:

{ 
    "namespace": "sensors.avro", 
    "type": "record", 
    "name": "Sensor", 
    "fields": [ 
     {"name": "sensor_network_name", "type": "string"}, 
     {"name": "value", "type": ["int", "null"]}, 
     {"name": "threshold_value", "type": ["int", "null"]} 
    ] 
} 

答えて

0

I避難所誰もこれをやっているのを見たことはないが、それを自分で望んでいた。あなた自身で書く必要があるかもしれませんが、あまりにも悪くあってはいけません。直列化するオブジェクトが単純であると仮定すると、あなたがしなければならないことは、フィールドをループし、Python型からavro型へのマップを持つことだけです。ネストされたフィールドには、各オブジェクトを掘るために再帰のようなものが必要です。

1

このコード:

import avro.schema 
import io, random 
from avro.io import DatumWriter, DatumReader 
import avro.io 

# Path to user.avsc avro schema 
schema_path="user.avsc" 
schema = avro.schema.Parse(open(schema_path).read()) 


for i in xrange(1): 
    writer = avro.io.DatumWriter(schema) 
    bytes_writer = io.BytesIO() 
    encoder = avro.io.BinaryEncoder(bytes_writer) 
    writer.write({"name": "123", "favorite_color": "111", "favorite_number": random.randint(0,10)}, encoder) 
    raw_bytes = bytes_writer.getvalue() 

    print(raw_bytes) 

    bytes_reader = io.BytesIO(raw_bytes) 
    decoder = avro.io.BinaryDecoder(bytes_reader) 
    reader = avro.io.DatumReader(schema) 
    user1 = reader.read(decoder) 
    print(" USER = {}".format(user1)) 

このスキーマ

{"namespace": "example.avro", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "name", "type": "string"}, 
    {"name": "favorite_number", "type": ["int", "null"]}, 
    {"name": "favorite_color", "type": ["string", "null"]} 
] 
} 

に対処するためには何が必要です。

クレジットはthis gist

関連する問題