2016-12-15 5 views
1

Avro &カフカを使用してオブジェクトのリストを送信しようとしています。私の主な問題はリストのサイズが頻繁に変更される可能性があるためです動的なAvroスキーマは、Avroスキーマがよく知られている構造を意味すると理解しているからです。avro&kafkaを使用してオブジェクトのリストを送信する方法

これを行う方法は誰でも知っていますか?

+0

アブロは、マップをアラスのようないくつかの複雑なタイプをサポートしています。これにより、柔軟性が向上します:http://stackoverflow.com/questions/39232395/kafka-kstreams-processing-timeouts/39237089#39237089 –

答えて

0

私は最も簡単な方法は、リストで例えば をするクラスを作る にあると思う:

public class AvroObj { 

    private List<TestObj> list; 

    public List<TestObj> getList() { 
     return list; 
    } 

    public void setList(List<TestObj> list) { 
     this.list = list; 
    } 
} 

は、スキーマを作成します。

Schema schema = ReflectData.get().getSchema(AvroObj.class); 

はReflectDatumWriterを使用してバイトに、それをシリアライズ(私が提供他のデータライターがClassCastExceptionをキャッチする可能性があります)

try(ByteArrayOutputStream out = new ByteArrayOutputStream()) { 
      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
      DatumWriter<AvroObj> writer = new ReflectDatumWriter<>(schema); 
      writer.write(avroObj, encoder); 
      encoder.flush(); 
      byte[] bytes = out.toByteArray(); 
     } 

その後、kafkaプロデューサでバイトを送信します。

deserialiseは、消費者が受け取ったことバイト:

DatumReader<AvroObj> reader1 = new ReflectDatumReader<AvroObj>(schema); 
     Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); 
     AvroObj decodedAvroObj = reader1.read(null, decoder); 
関連する問題