2016-05-03 14 views
0

カフカメッセージを解析しようとしていますが、暗号化されたAVROフォーマットです。今、私はJSON形式のデータを取得するために、次のコードを書かれている復号化カフカアブロメッセージ

{ 
    "type": "record", 
    "namespace": "kafka.events", 
    "name": "AvroSchema", 
     "fields": [ 
      { "name": "product_id", "type": "string" }, 
      { "name": "available_to_promise_quantity", "type": "double" }, 
      { "name": "online_available_to_promise_quantity", "type": "double" }, 
      { "name": "stores_available_to_promise_quantity", "type": "double" }, 
      { "name": "is_infinite_inventory", "type": "boolean", "default" : false }, 
      { "name": "event_timestamp", "type": "long" }, 
      { "name": "previous_event", "type": "AvroSchema" } 
     ] 
} 

for (final KafkaStream<byte[], byte[]> stream : streams){ 
    ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator(); 
    byte[] consumedEncryptedMessage; 
    MessageAndMetadata<byte[], byte[]> consumedEntry; 
    while(consumerIterator.hasNext()){ 
     consumedEntry = consumerIterator.next(); 
      if(null != consumedEntry){ 
       consumedEncryptedMessage = consumedEntry.message(); 
        try { 
          Schema schema = null; 
          schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc")); 
          DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
          Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null); 
          GenericRecord decryptedmsg = null; 
          decryptedmsg = reader.read(null, decoder); 
          System.out.println(decryptedmsg); 
         } 
         catch(Exception e) { 
          e.printStackTrace(); 
          System.out.println(e); 
         } 

がどのようにメッセージを復号化するために私を助けてください、私は次のようAvroSchema.avscアブロスキーマファイルを持っています。

暗号化されたバイトのメッセージは、このタイプのものであり:080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A

示唆したように、私は変更を加え、そして今私は、コードの一部、次ています」と

while(consumerIterator.hasNext()){ 
    consumedEntry = consumerIterator.next(); 
     if(null != consumedEntry){ 
      consumedEncryptedMessage = consumedEntry.message(); 
       try { 
        Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc")); 
        File myfile = new File("/Users/z001ldc/Desktop/myfile.txt"); 
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
        FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage); 
        @SuppressWarnings("resource") 
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader); 
        while (dataFileReader.hasNext()) { 
         decryptedMessage = dataFileReader.next(decryptedMessage); 
         System.out.println(decryptedMessage.get("product_id").toString()); 
        } 
       } 
       catch(Exception e) { 
        e.printStackTrace(); 
        System.out.println(e); 
       } 

をしかし、まだ、私はエラーを取得していますデータファイルではありません "。その後

答えて

0

デシリアライズは文句を言わない復号

まず必要あなたのスキーマラインが

schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc")); 
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 

取得

DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(consumedEncryptedMessage, reader); 
GenericRecord user = null; 
while (dataFileReader.hasNext()) { 
// Reuse user object by passing it to next(). This saves us from 
// allocating and garbage collecting many objects for files with 
// many items. 
user = dataFileReader.next(user); 
System.out.println(user); 
+0

DataFileReader dataFileReader =新しいDataFileReader (consumedEncryptedMessage、リーダー)。 この行は、consumedEntryMessageがファイルではなく、byte []型であるため、エラーを表示しています。メッセージをファイルとして使用する必要がありますか? –

+0

Ohhh ..スキンATPEventで指定されたような一連のイベントが発生しているのを確認できませんでした。テストのためにスキーマから削除してみてください。System.out.println(result.get( "product_id")。toString())を試してみてください。うまくいけば、あなたのコードの残りの部分は正常に見えます。 – tesnik03

+0

私はバイト配列としてしか得ていないので、ファイルの代わりにバイト配列でconsumedentrymsgを使う方法はありますか? –