2016-12-08 45 views
1

Google Cloud StorageからCSVファイルを取り込み、それらをマージしてBigQueryに書き込む予定のETLジョブを作成しようとしています。Google Cloud Dataflow CSVファイルをマージしてBigQueryに書き込む例

私はCSVの読み取り部分を見つけ出すことができました。データフローのドキュメントはマージオプションを理解するのに役立たないので、私はマージに渋滞しています。

PCollection<String> File1 = p.apply(TextIO.Read.from("gs://**/DataFile1.csv")); 
PCollection<String> File2 = p.apply(TextIO.Read.from("gs://**/DataFile2.csv")); 

すでに定義されているBigQueryテーブルにfile1とfile2の内容をマージして書き込みます。

ファイル1例:

Order,Status,Follow,substatus Order1, open, Yes, staged Order2, InProcess,No, withbackoffice

ファイル2例:

Order,Status,Follow,substatus Order3, open, Yes, staged Order4, InProcess,No, withbackoffice BigQueryのテーブルには、私がマージする方法を知っている列

Order,Status,Follow,substatus - Order1, open, Yes, staged - Order2, InProcess,No, withbackoffice - Order3, open, Yes, staged - Order4, InProcess,No, withbackoffice

とのことを持っている必要がありますプレーンなJavaですが、クラウドデータフローでこれを行うのに役立つ適切なPTransformを削除してください。親切に助けてください!ありがとう。

+0

マージするとはどういう意味ですか?クロスジョイント/カルテシアンの商品ですか? CoGroupByKeyトランスフォーム(https://cloud.google.com/dataflow/model/group-by-key#join)を使用して調べることができます。 –

+0

お返事ありがとうございます。私は連合を意味する。上記の例で説明したとおりです。ファイル1はn行、ファイル2はm行です。 CSVヘッダーは同じなので、スキーマは同じです。 BigQueryへの出力は、CSVヘッダーと行n + mの列です。シンプルユニオン。また、以前もリンクを見ました。それはこのトピックに関する例や助けを提供しません。 – Raju

+0

ここでは、bigqueryに2つのpcollectionを書き込むことができます。https://cloud.google.com/dataflow/model/bigquery-io#writing-to-bigquery(BigQueryIO.Write.WriteDisposition.WRITE_APPENDを使用している可能性があります) )。 –

答えて

0

2つのPCollectionを1つに「連結」する方法を尋ねているようです。その答えはFlatten transformです。 BigQueryに連結されたコレクションを通常の方法で書くことができます。

+0

ありがとう!フラットトランスフォームはトリックをしたものです! – Raju

関連する問題