0

Dimensionテーブルを読み込むためのETLを構築しようとしています。私はApache Bea、PythonとDataFlow、BigQueryを使っています。Apache-BeamがPCollectionにシーケンス番号を追加する

BigQueryにロードするために、pcollectionの各要素にシーケンス番号を割り当てる必要がありますが、これを行う方法はありません。

私は前回の集計を行うためにDataFlowが必要だと思って、最終的なpcollectionを取得してシーケンス番号を追加する必要があると思いますが、この瞬間に並列処理を中止してpcollectionをリストにキャストする必要があります.collect()を使用して)簡単なループを作成してシーケンス番号を割り当てます。それは正しい?

これは私がコード化されてきたパイプラインです:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

私はpcollectionからリストを取得する方法はありません読んだ: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

どのように私はそれを達成することができますか?どんな助け?

+0

はあなたがこれまでにしようとしているものを投稿することができます参照してください、とコードについての詳細を学ぶために? –

+0

これはBeamを使用した私の最初のアプローチです。私はコードを追加するつもりですが、私は決して見つけません。 –

+0

シーケンス番号を追加する必要があると思われる理由について詳しく説明できますか?このシーケンス番号が必要なBigQueryで行うことは何ですか? –

答えて

1

PCollectionの各要素のリストを取得する場合は、サイド入力を使用できます。これにより、結果からすべての並列処理が削除され、パイプラインが遅くなる可能性があることに注意してください。

それでも、その後、これをしたい場合:

side_input_coll = beam.pvalue.AsIterable(my_collection) 

(p 
| beam.Create([0]) 
| beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)], 
       my_seq=side_input_coll)) 

しかし、それは単にランダムなIDを生成するために最善であってもよいし、並列性を維持するためにそれを忘れないでください。 PCollectionsは、本質的に順序付けられていないことに注意してください。

側入力、Beam Programming Guide on Side Inputs

関連する問題