2016-08-31 7 views
3

は、これは私のコードは+ PostgreSQLの

import transaction 

@app.task(name='task_name') 
def task_name_fn(*args, **kwargs): 
    with transaction.manager: 
     try: 
      actual_fn(*args, **kwargs) 
      transaction.commit() 
     except: 
      transaction.abort() 

のように見えるものですが、私のtransaction.abort()はロールバックされていないようです。この作業者の以降のセロリの作業はすべて失敗します。次のエラーが表示されます

このセッションのトランザクションは、前回の例外のためにロールバックされました。このセッションで新しいトランザクションを開始するには、まずSession.rollback()を発行します。

私は間違っていますか?
この問題が発生しないように、task_name_fnをどのように書きますか?

答えて

4

まず、トランザクションを中止するために例外をキャッチする必要はありません。

import transaction 

@app.task(name='task_name') 
def task_name_fn(*args, **kwargs): 
    with transaction.manager: 
     actual_fn(*args, **kwargs) 

例外が発生した場合、トランザクションはアボートされます。

次に、それをタスクデコレータで抽象化することができます。そのようなSomehing(テスト、おそらくあるように動作しない):あなたはトランザクションを使用しているので、

from functools import wraps 
import transaction 

def tm_task(f): 
    @wraps(f) 
    def decorated(*args, **kwargs): 
     with transaction.manager: 
      return f(*args, **kwargs) 
    return app.task()(decorated) 

@tm_task 
def actual_fn(*args, **kwargs): 
    pass # your function code here instead of calling other function 

また、あなたは、トランザクションがコミットされた後、あなたの仕事のキューイングを遅らせることがよいでしょう。たとえば、トランザクションに行を挿入し、その行に何かを実行するジョブをキューイングした場合、最初のトランザクションがコミットされ、その行がトランザクション外で使用可能になる前にワーカーに到着する可能性があるためです。何かのように:

class AfterCommitTask(Task): 
    def apply_async(self, *args, **kw): 
     tx = transaction.get() 
     def hook(status): 
      if status: # Only queue if the transaction was succesfull. 
       super(AfterCommitTask, self).apply_async(*args, **kw) 
     tx.addAfterCommitHook(hook) 

def tm_task(f): 
    @wraps(f) 
    def decorated(*args, **kwargs): 
     with transaction.manager: 
      return f(*args, **kwargs) 
    return app.task(base=AfterCommitTask)(decorated) 

@tm_task 
def actual_fn(*args, **kwargs): 
    pass # your function code here instead of calling other function 
+0

あなた、私の友人は私に数時間の研究を救った!ありがとうございました –

+1

あなたは私に10Kの評判に達しました、ありがとう。真剣に、しかし、私は誰かが少しずつそれを学ぶ代わりに、数年前に私に言ったと思う。私は他の人がこのSOの質問+答えを見つけることを願っています。 –

+0

これは、SQLデータベースに競合が解決された場合にトランザクションリプレイを実行しないという回答です。競合状態を回避するためにSQLで有効なトランザクション分離レベルを使用している場合は、再試行が必要です。その場合は、再試行可能なトランザクションハンドラの例を参照してください。https://websauna.org/docs/api/websauna.system.model.retry.html?highlight=retryable#websauna.system.model.retry.retryableまたはCeleryタスク基本クラス再試行できるhttps://websauna.org/docs/api/websauna.system.task.tasks.html?highlight=retryable#websauna.system.task.tasks.RetryableTransactionTask –