2012-07-16 12 views
13

いくつかのタスクを実行するセロリチェーンがあります。それぞれのタスクは失敗し、再試行できます。簡単な例は以下を参照してください:チェリーを再試行するとチェーンの一部であるタスクが失敗する

from celery import task 

@task(ignore_result=True) 
def add(x, y, fail=True): 
    try: 
     if fail: 
      raise Exception('Ugly exception.') 
     print '%d + %d = %d' % (x, y, x+y) 
    except Exception as e: 
     raise add.retry(args=(x, y, False), exc=e, countdown=10) 

@task(ignore_result=True) 
def mul(x, y): 
    print '%d * %d = %d' % (x, y, x*y) 

とチェーン:

from celery.canvas import chain 
chain(add.si(1, 2), mul.si(3, 4)).apply_async() 

は、2つのタスクを実行する(そして何が失敗していないと仮定して)、あなたが取得/参照、印刷になります。

1 + 2 = 3 
3 * 4 = 12 

しかし、追加タスクが最初に失敗し、その後の再試行コールで成功すると、チェーン内のタスクの残りの部分は実行されません。つまり、追加タスクは失敗し、チェーン内の他のタスクはすべて実行されず、 ew秒になると、addタスクが再度実行され、チェーン内のタスク(この場合はmul.si(3、4))は実行されません。

セロリは、失敗したタスクから失敗したチェーンを継続する方法を提供しますか?そうでない場合は、これを達成し、チェーンのタスクが指定された順序で実行されるように、そしてタスクが数回再試行されても前のタスクが正常に実行された後でのみ、

注1:問題が

add.delay(1, 2).get() 
mul.delay(3, 4).get() 

を行うことによって解決することができますが、私は鎖が失敗したタスクで動作しない理由を理解することに興味があります。

答えて

0

チェーンが失敗したタスクで動作しない理由を理解することにも興味があります。

私はいくつかのセロリコードを掘ると、私はこれまでに発見したことは次のとおりです。

実装がapp.builtins.py

@shared_task 
def add_chain_task(app): 
    from celery.canvas import chord, group, maybe_subtask 
    _app = app 

    class Chain(app.Task): 
     app = _app 
     name = 'celery.chain' 
     accept_magic_kwargs = False 

     def prepare_steps(self, args, tasks): 
      steps = deque(tasks) 
      next_step = prev_task = prev_res = None 
      tasks, results = [], [] 
      i = 0 
      while steps: 
       # First task get partial args from chain. 
       task = maybe_subtask(steps.popleft()) 
       task = task.clone() if i else task.clone(args) 
       i += 1 
       tid = task.options.get('task_id') 
       if tid is None: 
        tid = task.options['task_id'] = uuid() 
       res = task.type.AsyncResult(tid) 

       # automatically upgrade group(..) | s to chord(group, s) 
       if isinstance(task, group): 
        try: 
         next_step = steps.popleft() 
        except IndexError: 
         next_step = None 
       if next_step is not None: 
        task = chord(task, body=next_step, task_id=tid) 
       if prev_task: 
        # link previous task to this task. 
        prev_task.link(task) 
        # set the results parent attribute. 
        res.parent = prev_res 

       results.append(res) 
       tasks.append(task) 
       prev_task, prev_res = task, res 

      return tasks, results 

     def apply_async(self, args=(), kwargs={}, group_id=None, chord=None, 
       task_id=None, **options): 
      if self.app.conf.CELERY_ALWAYS_EAGER: 
       return self.apply(args, kwargs, **options) 
      options.pop('publisher', None) 
      tasks, results = self.prepare_steps(args, kwargs['tasks']) 
      result = results[-1] 
      if group_id: 
       tasks[-1].set(group_id=group_id) 
      if chord: 
       tasks[-1].set(chord=chord) 
      if task_id: 
       tasks[-1].set(task_id=task_id) 
       result = tasks[-1].type.AsyncResult(task_id) 
      tasks[0].apply_async() 
      return result 

     def apply(self, args=(), kwargs={}, **options): 
      tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']] 
      res = prev = None 
      for task in tasks: 
       res = task.apply((prev.get(),) if prev else()) 
       res.parent, prev = prev, res 
      return res 
    return Chain 

であなたが最後にprepare_stepsprev_taskがリンクされていることがわかりますhappends次のタスクに移ります。 prev_taskが失敗した場合、次のタスクは呼び出されません。私は次へ前のタスクからlink_errorを追加することでテストしてる

if prev_task: 
    # link and link_error previous task to this task. 
    prev_task.link(task) 
    prev_task.link_error(task) 
    # set the results parent attribute. 
    res.parent = prev_res 

しかし、その後、不変でするように設定された場合を除き、多分両方のケース(の世話をしなければならない次のタスク、例えばより多くの引数を受け入れない)。

私はチェーンが、いくつかの構文はこれを好きせることによって、これをサポートすることができると思う:

c = chain(t1, (t2, t1e), (t3, t2e))

意味:t3からt1e

t2linkからt2link_error

t1linkとあなたが見つけたt2e

+0

私はチェーン以外のタスクを実行するチェーン状のタスクを使用することにしましたが、他のタスクを開始する前にタスクが終了するのを待ちます。たとえば、 'task1.delay([params])。取得する(); task2.delay([params])。get(); task3.delay([params]).get() 'を実行します。チェーン状のタスクは、いずれかのタスクによって発生した例外をキャッチして再試行できます。 – Andrei

+0

あなたの例から、t1eとt2eはそれぞれt2とt3を呼び出す必要がありますか? – Andrei

+0

チェーンの可能な構文に関する私の考えだけの例です。これは、次の各タスクが実際には一対のタスクであることを意味し、前のステップで例外/エラーが発生しなかった場合、ペアの最初の要素が呼び出され、2番目の要素が前の手順の失敗の例外/エラーハンドラです。 't1e'は' t1エラーハンドラ 'を意味します。 – anh

関連する問題