2017-10-13 3 views
0

比較的長時間にわたるタスクは、別のサーバー上で別々に実行されているセラーワーカーに委任されます。Flaskでコミットが成功した後にセロリタスクを実行しますか?

しかし、結果はリレーショナルデータベースに戻されます(task_descr.idに従って更新されたテーブル、以下を参照)。ワーカーはignore_resultを使用します。フラスコアプリケーションから要求された

タスク:

task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments]) 

問題は、トランザクションがまだフラスコ側で閉じられていない状態での作業が要求されていることです。これは競合状態を引き起こします。これは、Flaskアプリでトランザクションが終了する前にセルリーワーカーがタスクを完了することがあるためです。

トランザクションが成功した後にタスクを送信する適切な方法は何ですか?

また、条件を試行する前に、作業者がtask_descr.idの使用可能性をチェックし、タスクを再試行する必要があります(これは複雑すぎます)。

回答はRun function after a certain type of model is committedと似ていますが、ここでのタスク送信は明示的なので、一部のモデルでは更新/挿入を聞く必要はありません。

答えて

0

な方法の一つはPer-Request After-Request Callbacksで、アーミンRonacherに感謝:

@after_this_request 
    def send_mytask(response): 
     if response.status_code in {200, 302}: 
      task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments]) 
     return response 

ない理想的な、しかし、動作しているようです。私の場合は

from flask import g 

def after_this_request(func): 
    if not hasattr(g, 'call_after_request'): 
     g.call_after_request = [] 
    g.call_after_request.append(func) 
    return func 


@app.after_request 
def per_request_callbacks(response): 
    for func in getattr(g, 'call_after_request',()): 
     response = func(response) 
    return response 

は使用量があります。私のタスクは、正常に処理された要求のためのものなので、私は500やその他のエラー状態には気をつけません。

関連する問題