2

Google BigQueryにプッシュしようとしているログがあります。私はGoogleのデータフローを使用してパイプライン全体を構築しようとしています。ログ構造は異なり、4つの異なるタイプに分類できます。私のパイプラインでは、PubSubからログを読み込み、それを解析してBigQueryテーブルに書き込みます。ログが書き込まれる必要があるテーブルは、ログの1つのパラメータに依存します。問題は、実行時にBigQueryIO.WriteのTableNameを変更する方法のポイントに固執しています。Googleのデータフロー入力に基づいて複数のテーブルに書き込む

+0

は、次のようになります。http://stackoverflow.com/questions/30431840/writing-results-of-google-dataflow-pipeline-into-mulitple-sinks、それはあなたを助けるかもしれない、それをチェックしてみてください。 – robosoul

+0

@nikhil sharma - このソリューションを自分でハンドリングする必要がありますか?代わりにFluentdのようなものを使ってみましたか? http://www.fluentd.org/ –

+0

@Grahamは答えに感謝します。 http://stackoverflow.com/questions/35979421/dynamic-table-name-when-writing-to-bq-from-dataflow-pipelinesとまったく同じように必要ですが、データフローではこれがサポートされていないことにご迷惑をおかけして申し訳ありません。私はFluentdを試してみませんでしたが、私たちはデータフローやその他のものにパイプラインを実装しようとしています。 Googleのパイプラインすべてで非常に必要なので、Googleにこの機能を含める計画があるのか​​どうか教えてください。また、私がこの機能を実装することによって貢献できるなら、私はそれをもっとうれしく思います。できれば教えてください。 –

答えて

3

サイド出力を使用できます。

https://cloud.google.com/dataflow/model/par-do#emitting-to-side-outputs-in-your-dofn

以下のサンプルコードは、BigQueryのテーブルを読み出して、3つの異なるPCollectionsでそれを分割します。それぞれのPCollectionsは別のPub/Subトピック(代わりに別のBigQueryテーブル)に送信されてしまいます。

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); 

PCollection<TableRow> weatherData = p.apply(
     BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations")); 

final TupleTag<String> readings2010 = new TupleTag<String>() { 
}; 
final TupleTag<String> readings2000plus = new TupleTag<String>() { 
}; 
final TupleTag<String> readingsOld = new TupleTag<String>() { 
}; 

PCollectionTuple collectionTuple = weatherData.apply(ParDo.named("tablerow2string") 
     .withOutputTags(readings2010, TupleTagList.of(readings2000plus).and(readingsOld)) 
     .of(new DoFn<TableRow, String>() { 
      @Override 
      public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception { 

       if (c.element().getF().get(2).getV().equals("2010")) { 
        c.output(c.element().toString()); 
       } else if (Integer.parseInt(c.element().getF().get(2).getV().toString()) > 2000) { 
        c.sideOutput(readings2000plus, c.element().toString()); 
       } else { 
        c.sideOutput(readingsOld, c.element().toString()); 
       } 

      } 
     })); 
collectionTuple.get(readings2010) 
     .apply(PubsubIO.Write.named("WriteToPubsub1").topic("projects/fh-dataflow/topics/bq2pubsub-topic1")); 
collectionTuple.get(readings2000plus) 
     .apply(PubsubIO.Write.named("WriteToPubsub2").topic("projects/fh-dataflow/topics/bq2pubsub-topic2")); 
collectionTuple.get(readingsOld) 
     .apply(PubsubIO.Write.named("WriteToPubsub3").topic("projects/fh-dataflow/topics/bq2pubsub-topic3")); 

p.run(); 
+0

サイド出力の場合でも、パイプラインが実行される前に、つまりコンパイル時にパイプラインを定義するときに、シンク名(例:BigQueryIO)を定義する必要はありませんか?私の理解では、OPはBigQueryテーブルに動的に書き込むことを望んでいます。その名前は実行時にしかわかりません。私はこれがどのように可能であるかはわかりません。たぶん私は何かが分かりにくいです!オハイオ州、右。 –

+0

「ログが書き込まれる必要のあるテーブルがログの1つのパラメータに依存している」ということは、その名前が動的である必要があるか、または既知のセットの1つである必要があるかのいずれかです。うまくいけば、@nikhil sharmaはコメントできます。 –

+1

「ログ構造が違っていて、4つの異なるタイプに分類することができる」ため、これはセットだと思います。 - これにより、ログを4つの異なるテーブルに分けることができます。 –

関連する問題