2017-09-19 5 views
2

最新バージョンのApache airflowを使用しています。 LocalExecutorで始まった。そのモードでは、CeleryExecutorがそれらを使用するために必要だったウェブUIの状態をいくつかのやりとりで除いて、すべてうまくいっていた。 RedisでCeleryエグゼキュータをインストールおよび設定し、RedisをブローカURLおよび結果バックエンドとして設定しました。Apache Airflow Celery Redis DecodeError

タスクは、それが次のエラーを与える、その時点で予定されてまで、最初に動作するように表示されます。

File "/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler 
    job.run() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run 
    self._execute() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute 
    self._execute_helper(processor_manager) 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper 
    self.executor.heartbeat() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat 
    self.sync() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync 
    state = async.state 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state 
    return self._get_task_meta()['status'] 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta 
    return self._maybe_set_cache(self.backend.get_task_meta(self.id)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta 
    meta = self._get_task_meta_for(task_id) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for 
    return self.decode_result(meta) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result 
    return self.meta_from_decoded(self.decode(payload)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode 
    accept=self.accept) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors 
    reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors 
    yield 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

はピクルスのシリアル化エラーしているようだが、私はトレースするかどうかはわかりません原因。助言がありますか?

この問題は、私がサブダグ機能を使用するワークフローに一貫して影響します。問題はそれに関連している可能性があります。

注:私もrabbitMQを使ってテストしましたが、そこに別の問題がありました。クライアントは「ピアによる接続のリセット」を表示し、クラッシュします。 RabbitMQログは「クライアントが予期せずTCPコネクションをクローズしました」と表示します。

答えて

0

私たちのスケジューラログで正確に同じバックトレースを見た後、このつまずい:

File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

セロリは、「{」不審に思えたから始まる何かをunpickle化しようとしていたので、私はのtcpdumpのを取っているという事実トラフィックを監視し、Web UIを介してタスクを開始しました。結果のキャプチャがほぼ正確に上記のバックトレースは、スケジューラログに登場したのと同じ瞬間をこの交換が含まれていました。

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2" 
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}" 

Redisをからの応答のペイロードは明らかにJSONですので、なぜセロリはそれをunpickle化しようとしていますか? Airflow 1.7から1.8への移行中です。展開中に、v1.7を実行するAirflowワーカーと、v1.8を実行するAirflowワーカーがあります。作業者は、作業負荷が掛かっていないキューから取り出すことになっていましたが、DAGのバグのために、Airflow 1.8によってスケジュールされたTaskInstanceが、Airflow 1.7を介して起動されたセロリ作業者によって実行されました。

AIRFLOW-1038はセロリタスクステータスのシリアライザをJSON(デフォルト)からpickleに変更しました。この変更前のバージョンのコードを実行するワーカーはJSONで結果をシリアル化し、スケーラはこれを含むバージョンのコードを実行します変更は上記のエラーの原因となるunpickleによって結果を逆シリアル化しようとします。

0

airflow.cfgで設定したcelery_result_backendの種類を確認してください。そうでない場合は、データベースのバックエンド(mysqlなど)に切り替えてみてください。

ampqバックエンド(Celery 3.1以降でのみ使用可能)、redisおよびrpcバックエンドでは、問題が発生することがあります。

関連する問題