2012-11-06 6 views
5

私はtask_postrunシグナルのraisin SystemExit()によってメインセロリプロセスをシャットダウンしようとしています。信号はちょうどうまく発射され、例外は発生しますが、ワーカーは決して完全に終了せず、ただそこにハングします。セロリはtask_postrunシグナルでSystemExitを発生させてシャットダウンしようとしていますが、常にハングアップしてメインプロセスは終了しません

この作業を行うにはどうすればよいですか?

どこかの設定が忘れていますか?

以下は、私が労働者のために使っているコード(worker.py)です:

from celery import Celery 
from celery import signals 

app = Celery('tasks', 
    set_as_current = True, 
    broker='amqp://[email protected]//', 
    backend="mongodb://localhost//", 
) 

app.config_from_object({ 
    "CELERYD_MAX_TASKS_PER_CHILD": 1, 
    "CELERYD_POOL": "solo", 
    "CELERY_SEND_EVENTS": True, 
    "CELERYD_CONCURRENCY": 1, 
    "CELERYD_PREFETCH_MULTIPLIER": 1, 
}) 

def shutdown_worker(**kwargs): 
    print("SHUTTING DOWN WORKER (RAISING SystemExit)") 
    raise SystemExit() 

import tasks 

signals.task_postrun.connect(shutdown_worker) 

print("STARTING WORKER") 
app.worker_main() 
print("WORKER EXITED!") 

以下tasks.pyのためのコードです:

from celery import Celery,task 
from celery.signals import task_postrun 
import time 

from celery.task import Task 

class Blah(Task): 
    track_started = True 
    acks_late = False 

    def run(config, kwargs): 
     time.sleep(5) 
     return "SUCCESS FROM BLAH" 

def get_app(): 
    celery = Celery('tasks', 
     broker='amqp://[email protected]//', 
     backend="mongodb://localhost//" 
    ) 
    return celery 

はこれをテストするには、

>>> import tasks 
>>> results = [] 
>>> results.append(tasks.Blah.delay({})) 

出力:私が最初にすることは、労働者のコード(python worker.py)を実行され、その後、私はそうのようなタスクをキュー私は労働者から見るこれです:

STARTING WORKER 

-------------- [email protected] v3.0.9 (Chiastic Slide) 
---- **** ----- 
--- * *** * -- [Configuration] 
-- * - **** --- . broker:  amqp://[email protected]:5672// 
- ** ---------- . app:   tasks:0x26f3050 
- ** ---------- . concurrency: 1 (solo) 
- ** ---------- . events:  ON 
- ** ---------- 
- *** --- * --- [Queues] 
-- ******* ---- . celery:  exchange:celery(direct) binding:celery 
--- ***** ----- 

[2012-11-06 15:59:16,761: WARNING/MainProcess] [email protected] has started. 
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit) 

私は、Pythonのコードが完全に終了するプロセスのために、その後app.worker_main()への呼び出しから復帰した後、WORKER EXITEDを印刷すると期待していました。 。

私が作るにはどうすればよい:これが起こることはありませんし、ワーカープロセスは、それが離れて行くのkill -KILL {PID} EDなければならない(それがいずれかの他のタスクを消費することはできません

私は私の主な質問は以下のようになり推測しますapp.worker_main()からコードRETURN?

私は、タスクのX数が実行された後(プロセスCOMPLETELYの出口を持つことで)完全にワーカープロセスを再起動することができるようにしたいと思います。

UPDATE私は、労働者がぶら下がっているものを考え出した - それはSystemExit例外をキャッチした後、作業員(WorkControllerは)self.stopへの呼び出しにぶら下がっています。

答えて

2

私自身の質問に答えると嫌いです。

Anywhooは、それがWorkController内部Mediator成分に参加コールでブロッキングした(Mediator成分、内部絞り、それjoin S上stop()を呼び出します)。

すべてのレート制限を無効にすることでMediatorコンポーネントを削除しました(これはデフォルトではあるはずですが、何らかの理由ではありません)。

あなたは設定で、すべてのレート制限を無効にすることができます

CELERY_DISABLE_ALL_RATE_LIMITS: True 

希望、これはあまりにも道を他の誰かに役立ちます。

平和

+1

現在のバージョンのCeleryでは、オプションがCELERY_DISABLE_RATE_LIMITSに変更されました。デフォルト値はCELERY_DEFAULT_RATE_LIMITで定義されています。変更しない場合は制限はありません。 –

関連する問題