0
カフカメッセージを解析しようとしていますが、暗号化されたAVROフォーマットです。今、私はJSON形式のデータを取得するために、次のコードを書かれている復号化カフカアブロメッセージ
{
"type": "record",
"namespace": "kafka.events",
"name": "AvroSchema",
"fields": [
{ "name": "product_id", "type": "string" },
{ "name": "available_to_promise_quantity", "type": "double" },
{ "name": "online_available_to_promise_quantity", "type": "double" },
{ "name": "stores_available_to_promise_quantity", "type": "double" },
{ "name": "is_infinite_inventory", "type": "boolean", "default" : false },
{ "name": "event_timestamp", "type": "long" },
{ "name": "previous_event", "type": "AvroSchema" }
]
}
:
for (final KafkaStream<byte[], byte[]> stream : streams){
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
byte[] consumedEncryptedMessage;
MessageAndMetadata<byte[], byte[]> consumedEntry;
while(consumerIterator.hasNext()){
consumedEntry = consumerIterator.next();
if(null != consumedEntry){
consumedEncryptedMessage = consumedEntry.message();
try {
Schema schema = null;
schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc"));
DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null);
GenericRecord decryptedmsg = null;
decryptedmsg = reader.read(null, decoder);
System.out.println(decryptedmsg);
}
catch(Exception e) {
e.printStackTrace();
System.out.println(e);
}
がどのようにメッセージを復号化するために私を助けてください、私は次のようAvroSchema.avscアブロスキーマファイルを持っています。
暗号化されたバイトのメッセージは、このタイプのものであり:080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A
示唆したように、私は変更を加え、そして今私は、コードの一部、次ています」と
while(consumerIterator.hasNext()){
consumedEntry = consumerIterator.next();
if(null != consumedEntry){
consumedEncryptedMessage = consumedEntry.message();
try {
Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc"));
File myfile = new File("/Users/z001ldc/Desktop/myfile.txt");
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage);
@SuppressWarnings("resource")
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader);
while (dataFileReader.hasNext()) {
decryptedMessage = dataFileReader.next(decryptedMessage);
System.out.println(decryptedMessage.get("product_id").toString());
}
}
catch(Exception e) {
e.printStackTrace();
System.out.println(e);
}
をしかし、まだ、私はエラーを取得していますデータファイルではありません "。その後
DataFileReader dataFileReader =新しいDataFileReader (consumedEncryptedMessage、リーダー)。 この行は、consumedEntryMessageがファイルではなく、byte []型であるため、エラーを表示しています。メッセージをファイルとして使用する必要がありますか? –
Ohhh ..スキンATPEventで指定されたような一連のイベントが発生しているのを確認できませんでした。テストのためにスキーマから削除してみてください。System.out.println(result.get( "product_id")。toString())を試してみてください。うまくいけば、あなたのコードの残りの部分は正常に見えます。 – tesnik03
私はバイト配列としてしか得ていないので、ファイルの代わりにバイト配列でconsumedentrymsgを使う方法はありますか? –