2016-12-06 7 views
1

セロリタスクをクラスとして作成しようとしていますが、難しかったです。クラスは次のとおりです。セロリ:カスタムベースクラス/子クラスベースのタスクがapp.tasksの下に表示されない

class BaseCeleryTask(app.Task): 

def is_complete(self): 
    """ default method for checking if celery task has completed. """ 
    # simply return result (since by default tasks return boolean indicating completion) 

    try: 
     return self.result 
    except AttributeError: 
     logger.error('Result not defined. Make sure task has run!') 
     return False 


class MacroReportTask(BaseCeleryTask): 

def run(self, params): 
    """ Override the default run method with signal factory run""" 
    # hold on to the factory 
    process = MacroCountryReport(params) 
    self.result = process.run() 
    return self.result 

が、私はアプリを初期化し、app.tasks(または実行ワーカー)を確認する場合、アプリはそのレジストリにこれらの上記のタスクを持っていないようです。他の機能ベースのタスク(using app.task() decorator)はうまく登録されているようです。次のメッセージと

process = SignalFactoryTask() 
process.delay(params) 

セロリワーカーエラー: Received unregistered task of type None

は、私は上記のようなタスクを実行します。

私は問題があると思います。通常の関数ベースのタスクと同様に、カスタムレジストリにカスタムクラスを追加するにはどうすればいいですか?

答えて

2

私は90%がバグだと確信しています。あなたのクラスタスクでは、以下を試してください

class BaseCeleryTask(app.Task): 

    def __init__(self): 
     self.name = "[modulename].BaseCeleryTask" 

class MacroReportTask(app.Task): 

    def __init__(self): 
     self.name = "[modulename].MacroReportTask" 

これは、アプリケーションに登録すると、名前が自動的に設定されないバグが残っているようです。そのことが分かれば教えてください。

+0

助けてくれてありがとう!私はテストする必要がありますが、それが動作するかどうかをお知らせします! – kri

関連する問題