2012-04-08 9 views
1

"eta"引数を使用してセラーでタスクをスケジュールするリマインダータイプのアプリケーションがあります。リマインダーオブジェクト内のパラメータが変更された場合(例えばリマインダーの時間)、以前に送信されたタスクを取り消し、新しいタスクをキューに入れる。複数のcelerydプロセス間で失効したタスクを追跡する方法

Celerydの再起動時に取り消されたタスクを追跡する良い方法があるのだろうかと思っていました。私はその場でcelerydプロセスを上下に拡大する機能を持っていたいと思います。取り消しコマンドを送信した後に開始されたcelerydプロセスはそのタスクを実行するようです。

これを行う1つの方法は、取り消されたタスクIDのリストを保持することですが、この方法ではリストが任意に増加します。このリストをプルーニングするには、タスクがもはやRabbitMQキューに存在しないことが保証されている必要があります。

私はまた、各celerydワーカーに対して--statedbファイルを共有しようとしましたが、指定されたファイルはワーカーの終了時にのみ更新されるため、達成したいものには適していないようです。

ありがとうございます!

答えて

0

プロジェクトで同様のことをしなければならず、celerycamdjango-admin-monitorとしました。モニターはタスクのスナップショットを取得し、定期的にデータベースに保存します。また、すべてのタスクの状態を閲覧して確認するうえで便利なユーザーインターフェイスがあります。そして、それを使用することもできますeven if your project is not Django based

0

私はこれまでにこれに似た何かを実装しました。私が思いついた解決策は、あなたと非常に似ていました。

私がこの問題を解決したのは、ジョブが実行されたときに(文書が推奨するように主キーを渡すことによって)ワーカーにTaskオブジェクトをフェッチさせることでした。あなたのケースでは、リマインダーが送信される前に、作業者はタスクが実行可能であることを確認するためのチェックを実行する必要があります。そうでない場合は、ETAが変更され、別のワーカーが新しい仕事を引き受けると仮定して、何の仕事もせずに単に戻るべきです。

+0

:私はあなたがこのためにworker_ready 信号を使用することができると思います以前に取り消されたタスクを実行しないでください。 –

+0

私はあなたが既に何らかの種類のデータベースモデルを設定していると仮定していますが、必要に応じてタスクを取り消すことができるように、タスクIDも保存していますか?もしそうなら、 'completed'フラグをこのモデルに追加することができます。 –

+0

私はちょうど思い付きました。取り消しタスクIDのリストを保持し、celerydプロセスがスピンアップまたは再起動されるたびに、スクリプトはリスト全体をループして取り消しコマンドを再送します。その方法では、最後のスクリプトの実行以降に取り消されたタスクIDを保持するだけです。あなたはこの実装の欠陥を見ることができますか? –

1

興味深い問題は、ブロードキャストコマンドを使用して解決するのが簡単だと思います。 新しいワーカーが起動すると、他のすべてのワーカーに、取り消された タスクを新しいワーカーにダンプするよう要求された場合。モジュールcontrol.pyは

、あなたは簡単に@Panel.registerを使用して、新しいコマンドを追加することができ 、二つの新しいリモート制御コマンドを

追加:

from celery.worker import state 
from celery.worker.control import Panel 

@Panel.register 
def bulk_revoke(panel, ids): 
    state.revoked.update(ids) 

@Panel.register 
def broadcast_revokes(panel, destination): 
    panel.app.control.broadcast("bulk_revoke", arguments={ 
     "ids": list(state.revoked)}, 
     destination=destination) 

CELERY_IMPORTSにそれを追加します。

CELERY_IMPORTS = ("control",) 

だけ不足している問題を新しい作業者 が起動時にbroadcast_revokesをトリガーするように接続することです。任意の剪定は、新たに再スタートワーカープロセスという保証の損失につながるので、これはまだ理論的には、私は、データベース内のすべての前のタスクの結果を保持することを必要としない

from celery import current_app as celery 
from celery.signals import worker_ready 

def request_revokes_at_startup(sender=None, **kwargs): 
    celery.control.broadcast("broadcast_revokes", 
          destination=sender.hostname) 
関連する問題