2016-08-12 3 views
0

私はkafkaからavroデータを消費しているjavaカフカの消費者を抱えています[トピックx]。コード生成なしでこのデータをHDFSにプッシュすることになっています。アブロdocumentationでは、彼らは次のようなものを使用している:これでAVROデータをHadoopのhdfsに書き込む

GenericRecord e1 = new GenericData.Record(schema);  
e1.put("key", "value"); 

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); 

DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
dataFileWriter.create(schema, new File("<HDFS file path>")); 

dataFileWriter.append(e1); 
dataFileWriter.close(); 

問題は、私はすでにアブロデータを持っている、です。この一連の手順を使用するには、avroパケットをデシリアライズした後に各キーと値のペアを抽出し、それをGenericRecordオブジェクトにプッシュしなければなりません。私は達成しようとしていることの例は見つけませんでした。関連する文書へのヒントやリンクは非常に高く評価されます。

+2

独自のKafka-> HDFS摂取ツールを実装する代わりに、Kafkaの組み込みKafka Connectフレームワークと、[kafka-connect-hdfs](https:// github .com/confluentinc/kafka-connect-hdfs)?リンクされたHDFSシンクコネクタはAvroをそのまま使用できます。 –

+0

いくつかのスキーマレジストリの問題のため、コンフルエントから離れました。したがって、コンフルエントなフレームワークは使用できません。 – Bitswazsky

+0

実行しているスキーマレジストリの問題を精緻化していますか?もちろん、https://github.com/confluentinc/schema-registry/issuesで行うこともできます。 –

答えて

0

あなたが正しく質問していると分かったら、com.twitter.bijection.Injectionとcom.twitter.bijection.avro.GenericAvroCodecsのパッケージを試してみることをお勧めします。

ここをクリックしてくださいhttp://aseigneurin.github.io/2016/03/04/kafka-spark-avro-producing-and-consuming-avro-messages.html Kafkaプロデューサでは、GenericRecordがKafkaトピックに置かれたバイト[]に変換され、次にコンシューマでこのバイトがスキーマに従ってGenericRecordに反転されます。レコードのすべてのフィールドに値を入力する必要はありません。その後、このレコードをファイルに書き込むことができます。

また、ファイルインスタンスを作成できないため、おそらくHDFSでファイルにアクセスする必要があります。

関連する問題