1

pubサブトピックへのログエクスポートを有効にしました。私はこれらのログを処理し、関連する列をBigQueryに格納するためにデータフローを使用しています。誰かがLogEntryオブジェクトにpubsubメッセージのペイロードの変換を助けてもらえますか? 私は次のコードを試してみました:ログエクスポートでpubsubペイロードをLogEntryオブジェクトに変換する方法

@ProcessElement 
public void processElement(ProcessContext c) throws Exception { 
    PubsubMessage pubsubMessage = c.element(); 

    ObjectMapper mapper = new ObjectMapper(); 

    byte[] payload = pubsubMessage.getPayload(); 
    String s = new String(payload, "UTF8"); 
    LogEntry logEntry = mapper.readValue(s, LogEntry.class); 
} 

をしかし、私は、次のエラーを得た:

com.fasterxml.jackson.databind.JsonMappingException: Can not find a (Map) Key deserializer for type [simple type, class com.google.protobuf.Descriptors$FieldDescriptor] 

編集: 私は次のコードを試してみました:

try { 
     ByteArrayInputStream stream = new ByteArrayInputStream(Base64.decodeBase64(pubsubMessage.getPayload())); 
     LogEntry logEntry = LogEntry.parseDelimitedFrom(stream); 
     System.out.println("Log Entry = " + logEntry); 
    } catch (InvalidProtocolBufferException e) { 
     e.printStackTrace(); 
    } 

をしかし、私は、次の取得今エラー:

com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

答えて

0

JSON format parserはこれを行う必要があります。 Javaは私の強みではありませんが、あなたは次のようなものを探しています:

@ProcessElement 
public void processElement(ProcessContext c) throws Exception { 
    LogEntry.Builder entryBuilder = LogEntry.newBuilder(); 
    JsonFormat.Parser.usingTypeRegistry(
     JsonFormat.TypeRegistry.newBuilder() 
      .add(LogEntry.getDescriptor()) 
      .build()) 
     .ignoringUnknownFields() 
     .merge(c.element(), entryBuilder); 
    LogEntry entry = entryBuilder.build(); 
    ... 
} 

タイプを登録せずに立ち去ることができるかもしれません。私はC++でproto型がグローバルレジストリにリンクされていると思います。

サービスが新しいフィールドを追加してエクスポートし、protoディスクリプタのコピーを更新していない場合は、「ignoringUnknownFields」が必要になります。エクスポートされたJSONの "@type"フィールドも問題を引き起こします。

ペイロードの特別な処理が必要な場合があります(JSONの場合はストリップしてから別々に解析します)。それがJSONなら、私はパーサーが存在しないサブメッセージにデータを挿入しようとします。それがprotoの場合... Anyタイプも登録すれば実際に動作するかもしれません。

関連する問題