2017-12-01 5 views
0

JSONを読み込み、処理してJSONを出力する非常に単純なジョブを実行しています。何らかの理由で が、これは常に非常に奇妙な「酸洗」のエラーで失敗します。それは、データの最初の行を処理する上でほとんどすぐに失敗しApache Beam Pythonをデータフローで実行すると、奇妙な酸洗エラーが発生する

PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed [while running 'map to user_activity'] 

。前のステージでは、(String、[])のタプルが出力されます。 'user_activityへのマップ'ステージが実行されると、[]を反復しようとすると失敗します。

これらの酸洗エラーの一般的な原因と思われるラムダはありません。入力タプルの[]を反復するように絞り込んだ。反復しないと、仕事は「働く」でしょう。すぐに:

for entry in input_tuple: 
    pass 

すぐにジョブが失敗します。

****更新**** 入力タプルを反復することはキーではないことが判明しました。地図機能でループのANYでも、クラッシュの原因となります。このような何か:

q=[1,2,3,4,5,6] 
for a in q: 
    pass 

ことはここでは、エラーの完全なスタックトレースです:

message: "An exception was raised when trying to execute the workitem 4985068250752295797 : Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 166, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:3175) def start(self): File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:3079) with self.scoped_start_state: File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:2994) with self.spec.source.reader() as reader: File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start (dataflow_worker/native_operations.c:2938) self.output(windowed_value) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output (apache_beam/runners/worker/operations.c:5783) cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive (apache_beam/runners/worker/operations.c:3622) cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11089) with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11043) self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:10156) self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10458) self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented (apache_beam/runners/common.c:11363) raise File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10371) self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process (apache_beam/runners/common.c:6270) self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs (apache_beam/runners/common.c:12500) self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive (apache_beam/runners/worker/operations.c:3622) cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11089) with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process (apache_beam/runners/worker/operations.c:11043) self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:10156) self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10458) self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented (apache_beam/runners/common.c:11673) raise new_exn, None, original_traceback File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:10371) self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process (apache_beam/runners/common.c:6270) self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs (apache_beam/runners/common.c:12500) self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive (apache_beam/runners/worker/operations.c:3588) self.update_counters_start(windowed_value) File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start (apache_beam/runners/worker/operations.c:3808) self.opcounter.update_from(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 62, in apache_beam.runners.worker.opcounters.OperationCounters.update_from (apache_beam/runners/worker/opcounters.c:2396) self.do_sample(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 80, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample (apache_beam/runners/worker/opcounters.c:3017) self.coder_impl.get_estimated_size_and_observables(windowed_value)) File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables (apache_beam/coders/coder_impl.c:22968) def get_estimated_size_and_observables(self, value, nested=False): File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables (apache_beam/coders/coder_impl.c:22687) self._value_coder.get_estimated_size_and_observables( File "apache_beam/coders/coder_impl.py", line 260, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables (apache_beam/coders/coder_impl.c:9578) self.encode_to_stream(value, out, nested) File "apache_beam/coders/coder_impl.py", line 298, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream (apache_beam/coders/coder_impl.c:10416) self.fallback_coder_impl.encode_to_stream(value, stream, nested) File "apache_beam/coders/coder_impl.py", line 154, in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream (apache_beam/coders/coder_impl.c:5883) return stream.write(self._encoder(value), nested) File "/usr/local/lib/python2.7/dist-packages/apache_beam/coders/coders.py", line 437, in lambda x: dumps(x, HIGHEST_PROTOCOL), pickle.loads) PicklingError: Can't pickle : attribute lookup builtin.generator failed [while running 'map to user_activity']

+0

入力タプルを反復することは鍵ではないことが分かります。マップ関数のANY forループは、次のようなものであっても、クラッシュを引き起こします。 'q = [1,2,3,4,5,6]' 'in a q:' 'pass' –

+0

どのバージョンあなたはビームを使用していますか?コードのより完全な断片を含めることができますか? – jkff

答えて

0

おそらくこれは、beam.Map使用してではなくによるものですあなたが複数の要素を返すことを望む関数のフラットマップ

関連する問題