0

私はパイプラインを作成してpubsubからストリームを読み込み、apache beamでgoogle cloudデータフローを使用してbigqueryに書き込みます。 私はこのコードを持っている:Apache Beam Dataflow: 'NoneType'オブジェクトには 'parts'属性がありません

import apache_beam as beam 
from apache_beam.transforms.window import FixedWindows 

topic = 'projects/???/topics/???' 
table = '???.???' 

gcs_path = "gs://???" 

with beam.Pipeline(runner="DataflowRunner", argv=[ 
     "--project", "???", 
     "--staging_location", ("%s/staging_location" % gcs_path), 
     "--temp_location", ("%s/temp" % gcs_path), 
     "--output", ("%s/output" % gcs_path) 
    ]) as p: 
    (p 
    | 'winderow' >> beam.WindowInto(FixedWindows(60)) 
    | 'hello' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic) 
    | 'hello2' >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table)) 
    ) 
    p.run().wait_until_finish() 

をしかし、それを実行するときに、私はこのエラーを取得しています:

No handlers could be found for logger "oauth2client.contrib.multistore_file" 
ERROR:root:Error while visiting winderow 
Traceback (most recent call last): 
    File ".\main.py", line 20, in <module> 
    p.run().wait_until_finish() 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 339, in run 
    return self.runner.run(self) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 296, in run 
    super(DataflowRunner, self).run(pipeline) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 138, in run 
    pipeline.visit(RunVisitor(self)) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 367, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 710, in visit 
    part.visit(visitor, pipeline, visited) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 713, in visit 
    visitor.visit_transform(self) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 133, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 176, in run_transform 
    return m(transform_node) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 526, in run_ParDo 
    input_step = self._cache.get_pvalue(transform_node.inputs[0]) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 252, in get_pvalue 
    self._ensure_pvalue_has_real_producer(pvalue) 
    File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 226, in _ensure_pvalue_has_real_producer 
    while real_producer.parts: 
AttributeError: 'NoneType' object has no attribute 'parts' 

が、これはコードや構成に問題ですか? どうすれば動作させることができますか?

答えて

0

私はウィンドウパイプラインの経験はまだありませんが、コンセプトからわかるように、ウィンドウはパイプライン設定ではなく入力データに適用されるはずです。

このケースであること、あなたのコードは、おそらく次のようになります。

with beam.Pipeline(runner="DataflowRunner", argv=[ 
     "--project", "???", 
     "--staging_location", ("%s/staging_location" % gcs_path), 
     "--temp_location", ("%s/temp" % gcs_path), 
     "--output", ("%s/output" % gcs_path) 
    ]) as p: 
    (p 
    | 'hello' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic) 
    | 'winderow' >> beam.WindowInto(FixedWindows(60)) 
    | 'hello2' >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table)) 
    ) 
    p.run().wait_until_finish() 

公式レポは、同様に、ウィンドウの操作上のいくつかsamplesを持っています。

+0

: 'ValueError:現在、PubSubPayloadSourceはストリーミングパイプラインでのみ使用可能です。 ' –

+0

テストのためだけに' DirectRunner'を使用するとどうなるのでしょうか?それは動作しますか? –

関連する問題