2017-01-03 2 views
2

私はPython SDKでGoogle Cloud Dataflowを使用しています。Google DataflowのPCollectionから要素のリストを取得し、それをパイプラインで使用してWrite Transformをループする方法はありますか?

私はしたいと思います:

  • は、ループフィルタPCollections(ユニーク日付でそれぞれ)を作成し、書くことがそのリスト内の日付によって
  • マスターPCollectionのうち、ユニークな日付のリストを取得します。各フィルタリングされたPCollectionは、BigQueryの時分割テーブルでパーティションに分割されます。

このリストはどのように入手できますか?次の結合変換の後、ListPCollectionViewオブジェクトを作成しましたが、そのオブジェクトを反復できません:

class ToUniqueList(beam.CombineFn): 

    def create_accumulator(self): 
     return [] 

    def add_input(self, accumulator, element): 
     if element not in accumulator: 
      accumulator.append(element) 
     return accumulator 

    def merge_accumulators(self, accumulators): 
     return list(set(accumulators)) 

    def extract_output(self, accumulator): 
     return accumulator 


def get_list_of_dates(pcoll): 

    return (pcoll 
      | 'get the list of dates' >> beam.CombineGlobally(ToUniqueList())) 

私はすべて間違っていますか?それをする最善の方法は何ですか?

ありがとうございました。

答えて

3

PCollectionの内容を直接取得することはできません。Apache BeamまたはDataflowパイプラインは、実行する処理のクエリプランによく似ています。PCollectionは、プラン内の論理中間ノードです。データ。メインプログラムはプラン(パイプライン)を組み立て、それを放つ。

しかし最終的には、日付別のBigQueryテーブルにデータを書き込もうとしています。このユースケースは、現在のところin the Java SDKのみサポートされており、ストリーミングパイプラインの場合にのみサポートされています。

データに応じて複数の宛先にデータを書き込むより一般的な処理については、BEAM-92に従ってください。

も参照してください。Creating/Writing to Parititoned BigQuery table via Google Cloud Dataflow

関連する問題