Dimensionテーブルを読み込むためのETLを構築しようとしています。私はApache Bea、PythonとDataFlow、BigQueryを使っています。Apache-BeamがPCollectionにシーケンス番号を追加する
BigQueryにロードするために、pcollectionの各要素にシーケンス番号を割り当てる必要がありますが、これを行う方法はありません。
私は前回の集計を行うためにDataFlowが必要だと思って、最終的なpcollectionを取得してシーケンス番号を追加する必要があると思いますが、この瞬間に並列処理を中止してpcollectionをリストにキャストする必要があります.collect()を使用して)簡単なループを作成してシーケンス番号を割り当てます。それは正しい?
これは私がコード化されてきたパイプラインです:
p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy)
私はpcollectionからリストを取得する方法はありません読んだ: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?
どのように私はそれを達成することができますか?どんな助け?
はあなたがこれまでにしようとしているものを投稿することができます参照してください、とコードについての詳細を学ぶために? –
これはBeamを使用した私の最初のアプローチです。私はコードを追加するつもりですが、私は決して見つけません。 –
シーケンス番号を追加する必要があると思われる理由について詳しく説明できますか?このシーケンス番号が必要なBigQueryで行うことは何ですか? –