Google Cloud StorageからCSVファイルを取り込み、それらをマージしてBigQueryに書き込む予定のETLジョブを作成しようとしています。Google Cloud Dataflow CSVファイルをマージしてBigQueryに書き込む例
私はCSVの読み取り部分を見つけ出すことができました。データフローのドキュメントはマージオプションを理解するのに役立たないので、私はマージに渋滞しています。
PCollection<String> File1 = p.apply(TextIO.Read.from("gs://**/DataFile1.csv"));
PCollection<String> File2 = p.apply(TextIO.Read.from("gs://**/DataFile2.csv"));
すでに定義されているBigQueryテーブルにfile1とfile2の内容をマージして書き込みます。
ファイル1例:
Order,Status,Follow,substatus Order1, open, Yes, staged Order2, InProcess,No, withbackoffice
ファイル2例:
Order,Status,Follow,substatus Order3, open, Yes, staged Order4, InProcess,No, withbackoffice
BigQueryのテーブルには、私がマージする方法を知っている列
Order,Status,Follow,substatus - Order1, open, Yes, staged - Order2, InProcess,No, withbackoffice - Order3, open, Yes, staged - Order4, InProcess,No, withbackoffice
とのことを持っている必要がありますプレーンなJavaですが、クラウドデータフローでこれを行うのに役立つ適切なPTransformを削除してください。親切に助けてください!ありがとう。
マージするとはどういう意味ですか?クロスジョイント/カルテシアンの商品ですか? CoGroupByKeyトランスフォーム(https://cloud.google.com/dataflow/model/group-by-key#join)を使用して調べることができます。 –
お返事ありがとうございます。私は連合を意味する。上記の例で説明したとおりです。ファイル1はn行、ファイル2はm行です。 CSVヘッダーは同じなので、スキーマは同じです。 BigQueryへの出力は、CSVヘッダーと行n + mの列です。シンプルユニオン。また、以前もリンクを見ました。それはこのトピックに関する例や助けを提供しません。 – Raju
ここでは、bigqueryに2つのpcollectionを書き込むことができます。https://cloud.google.com/dataflow/model/bigquery-io#writing-to-bigquery(BigQueryIO.Write.WriteDisposition.WRITE_APPENDを使用している可能性があります) )。 –