2017-02-22 9 views
2

私はまだApache Beam/Cloud Dataflowが新しくなっているので、私の理解が間違っているとお詫び申し上げます。Python Apache Beam Side入力アサーションエラー

私はパイプラインを通じて〜30,000行のデータファイルを読み込もうとしています。私のシンプルなパイプラインは、まずGCSからcsvを開き、データからヘッダを取り除き、ParDo/DoFn関数でデータを実行した後、すべての出力をcsvに書き戻してGCSに戻しました。このパイプラインは機能し、私の最初のテストでした。

次に、パイプラインを編集してcsvを読み込み、ヘッダーを取り出し、ヘッダーをデータから削除し、ヘッダーをサイド入力としてParDo/DoFn関数を使用してデータを実行し、すべての出力を書き込みますcsvに唯一の新しいコードは、ヘッダーをサイド入力として渡し、データからそれをフィルターに掛けることでした。

enter image description here enter image description here

パルド/ DoFn機能は、私は私の側の入力が働いていたことを確認することができるようにちょうどcontext.elementを得build_rows。

私が手にエラーが以下の通りです: enter image description here
は、私は問題が何であるかを正確にわからないが、私はそれがメモリ制限が原因かもしれないと思います。サンプルデータを30,000行から100行にトリミングして、コードが最終的に機能しました。

サイド入力のないパイプラインは30,000行すべてを読み書きしますが、最終的にはデータの変換を行うためにサイド入力が必要になります。

パイプラインを修正して、GCSから大規模なcsvファイルを処理することができますが、ファイルの疑似グローバル変数としてサイド入力を使用することはできますか?

+0

*注:これはローカルでテストされています。私はコードを追加するにつれ、インクリメンタルテストを行ってきました。ローカルで動作している場合は、Google Cloud Dataflowで実行して、そこにも実行されていることを確認します。 Cloud Dataflowで動作する場合は、コードを追加します。 –

答えて

1

私は最近、Apache BeamのCSV file sourceをコード化しました。これをbeam_utils PiPyパッケージに追加しました。具体的には以下のように、あなたはそれを使用することができます。pip install beam_utils

  • インポート:from beam_utils.sources import CsvFileSource

    1. がビームutilsのをインストールします。
    2. ソースとして使用します。beam.io.Read(CsvFileSource(input_file))

    デフォルトの動作では、CsvFileSourceはヘッダーで索引付けされた辞書を返しますが、ドキュメントを参照して使用するオプションを決定することができます。

    独自のカスタムCsvFileSourceを実装したい場合は、余分なとして、あなたは、ビームのFileBasedSourceをサブクラス化する必要があります。

    import csv 
    class CsvFileSource(beam.io.filebasedsource.FileBasedSource): 
        def read_records(self, file_name, range_tracker): 
        self._file = self.open_file(file_name) 
        reader = csv.reader(self._file) 
        for i, rec in enumerate(reader): 
         yield res 
    

    そして、あなたは、ヘッダーやその他の特殊な動作のために解析するために、このロジックを拡張することができます。

    また、このソースは、順次解析する必要があるため、分割することはできないため、データ処理時にボトルネックになる可能性があります(ただし、問題ありません)。

  • +0

    こんにちはパブロ、私の質問の別の1つを見ていただきありがとうございます。あなたが書いたbeam_utils CsvFileSourceを使用するようにコードを変更しました。私は長い間、私に問題を起こしていたサイド入力を使用する必要があることを知っていますが、あなたは私の問題が何であったか教えてくれますか?ちょうど私は何が起こっていたのか理解することができます。 –

    +0

    なぜアサーションが起こったのかを調べてください。 – Pablo

    +0

    分割可能なものについて明示的に__init__を追加する必要があります。私はsuper(CsvFileSource、s).__ init __(ファイル名、splittable = False)。そうでない場合、read_recordsのrange_tracker引数が尊重されていると信じて、何人かの作業者が同じ内容を何度も何度も読んでしまう危険性があります。 – innohead

    関連する問題