私はまだApache Beam/Cloud Dataflowが新しくなっているので、私の理解が間違っているとお詫び申し上げます。Python Apache Beam Side入力アサーションエラー
私はパイプラインを通じて〜30,000行のデータファイルを読み込もうとしています。私のシンプルなパイプラインは、まずGCSからcsvを開き、データからヘッダを取り除き、ParDo/DoFn関数でデータを実行した後、すべての出力をcsvに書き戻してGCSに戻しました。このパイプラインは機能し、私の最初のテストでした。
次に、パイプラインを編集してcsvを読み込み、ヘッダーを取り出し、ヘッダーをデータから削除し、ヘッダーをサイド入力としてParDo/DoFn関数を使用してデータを実行し、すべての出力を書き込みますcsvに唯一の新しいコードは、ヘッダーをサイド入力として渡し、データからそれをフィルターに掛けることでした。
パルド/ DoFn機能は、私は私の側の入力が働いていたことを確認することができるようにちょうどcontext.elementを得build_rows。
私が手にエラーが以下の通りです:
は、私は問題が何であるかを正確にわからないが、私はそれがメモリ制限が原因かもしれないと思います。サンプルデータを30,000行から100行にトリミングして、コードが最終的に機能しました。
サイド入力のないパイプラインは30,000行すべてを読み書きしますが、最終的にはデータの変換を行うためにサイド入力が必要になります。
パイプラインを修正して、GCSから大規模なcsvファイルを処理することができますが、ファイルの疑似グローバル変数としてサイド入力を使用することはできますか?
*注:これはローカルでテストされています。私はコードを追加するにつれ、インクリメンタルテストを行ってきました。ローカルで動作している場合は、Google Cloud Dataflowで実行して、そこにも実行されていることを確認します。 Cloud Dataflowで動作する場合は、コードを追加します。 –