2017-11-13 7 views
3

私は、プロジェクト設定のディクショナリリスト(django.conf.settingsからインポートされたもの)に基づいて、定期的にセロリタスクを動的に追加するモジュールを作成しました。 私は設定で指定された特定のuuidで呼び出される関数add_tasksそのスケジュール機能使用していることを実行します。Celery add_periodic_taskブロック

def add_tasks(celery): 
    for new_task in settings.NEW_TASKS: 
     celery.add_periodic_task(
      new_task['interval'], 
      my_task.s(new_task['uuid']), 
      name='My Task %s' % new_task['uuid'], 
     ) 

ことなどが、私は私のcelery.pyで関数を呼び出すためにon_after_configure.connect信号を使用hereを示唆した。

app = Celery('my_app') 

@app.on_after_configure.connect 
def setup_periodic_tasks(celery, **kwargs): 
    from add_tasks_module import add_tasks 
    add_tasks(celery) 

このセットアップはcelery beatcelery workerの両方のために正常に動作しますが、私は私のDjangoのアプリケーションを提供するためにuwsgiを使用し、私のセットアップを壊します。 Uwsgiは、ビューコードがセロリの.delay()メソッドを使用してタスクを送信する最初の時点までスムーズに実行されます。その時点では、セロリはuwsgiで初期化されていますが、上記のコードでは永遠にブロックされているようです。私は、コマンドラインから手動でこれを実行した場合、それブロックは、私は以下の(短縮)スタックトレースを取得する際の割り込み:ミューテックスを取得に問題があるよう

Traceback (most recent call last): 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ 
    return obj.__dict__[self.__name__] 
KeyError: 'tasks' 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ 
    return obj.__dict__[self.__name__] 
KeyError: 'data' 

During handling of the above exception, another exception occurred: 

Traceback (most recent call last): 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ 
    return obj.__dict__[self.__name__] 
KeyError: 'tasks' 

During handling of the above exception, another exception occurred: 
Traceback (most recent call last): 

    (SHORTENED HERE. Just contained the trace from the console through my call to this function) 

    File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks 
    my_task.s(new_task['uuid']), 
    File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__ 
    return getattr(self._get_current_object(), name) 
    File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object 
    return loc(*self.__args, **self.__kwargs) 
    File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons 
    return app.tasks[ 
    File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__ 
    value = obj.__dict__[self.__name__] = self.__get(obj) 
    File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks 
    self.finalize(auto=True) 
    File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize 
    with self._finalize_mutex: 

それはそうです。

現在、私はsys.argv[0]uwsgiが含まれ、その後のみbeatが作業を必要とするよう、定期的なタスクを追加しますが、私はより多くの問題を完全に解決するために、ここで間違って起こっているかを理解したいと思いませんかどうかを検出するために、回避策を使用しています。

この問題は、マルチスレッドまたはマルチプロセスのuwsgiを使用することと関係がありますか?

問題の解決に役立つヒントをいただければ幸いです。ありがとうございました。

私が使用しています:Djangoの1.11.7とセロリを4.1.0

私はこの問題のために、最小限のセットアップを作成している1

編集:

celery.py:

import os 
from celery import Celery 
from django.conf import settings 
from myapp.tasks import my_task 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings') 

app = Celery('my_app') 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    sender.add_periodic_task(
     60, 
     my_task.s(), 
     name='Testtask' 
    ) 

app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

tasks.py:

from celery import shared_task 
@shared_task() 
def my_task(): 
    print('ran') 

CELERY_TASK_ALWAYS_EAGER = Falseであり、動作中のメッセージキューがあることを確認してください。

ラン:

./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()' 

は、上記のエラーを確認するために中断する前に、約10秒待ちます。

+1

なぜあなたはDjangoのprでそれを初期化しますか?それを行うための専用のプロセスを持つのではなく、 –

+0

私は専用の 'celery beat'プロセスを持っていますが、私はdjangoプロセスから' .delay() 'を呼び出す必要があります。それがコードブロックです。 – Tim

+0

add_tasksはDjangoプロセスから呼び出されませんか? –

答えて

0

は、あなたがその信号@app.on_after_finalize.connectを試してみるんでした:

プロジェクトcelery==4.1.0Django==2.0django-celery-beat==1.1.0

@app.on_after_finalize.connect 
def setup_periodic_tasks(sender, **kwargs): 
    """ setup of periodic task :py:func:shopify_data_fetcher.celery.fetch_shopify 
    based on the schedule defined in: settings.CELERY_BEAT_SCHEDULE 
    """ 
    for task_name, task_config in settings.CELERY_BEAT_SCHEDULE.items(): 
     sender.add_periodic_task(
      task_config['schedule'], 
      fetch_shopify.s(**task_config['kwargs']['resource_name']), 
      name=task_name 
     ) 

作品CELERY_BEAT_SCHEDULEdjango-celery-results==1.0.1作業からいくつかの高速スニペット:だから

CELERY_BEAT_SCHEDULE = { 
    'fetch_shopify_orders': { 
     'task': 'shopify.tasks.fetch_shopify', 
     'schedule': crontab(hour="*/3", minute=0), 
     'kwargs': { 
      'resource_name': shopify_constants.SHOPIFY_API_RESOURCES_ORDERS 
     } 
    } 
} 
+0

ありがとうございます。私はこれを試してみましたが、それは助けにはなりませんでしたが、正しい方向に私を指摘しました。@shared_taskデコレータの使い方が問題であるようです。私は私の質問を編集します。 – Tim

+0

あなたの 'fetch_shopify'セロリタスクを定義するのにどのデコレータを使用しますか? – Tim

0

を、私が見つけたのは、@shared_taskデコレータ問題を作り出します。私はそうのような信号によって呼び出される関数でタスクを右に宣言するとき、私はこの問題を回避することができます

def add_tasks(celery): 
    @celery.task 
    def my_task(uuid): 
     print(uuid) 

    for new_task in settings.NEW_TASKS: 
     celery.add_periodic_task(
      new_task['interval'], 
      my_task.s(new_task['uuid']), 
      name='My Task %s' % new_task['uuid'], 
     ) 

このソリューションは、実際に私のために働いているが、私はこれで1つのより多くの問題を抱えている:私はこのコードを使用プラグイン可能なアプリなので、シグナルハンドラの外でセロリのアプリに直接アクセスすることはできませんが、他のコード内からmy_task関数を呼び出すこともできます。関数内で定義することで、関数外では利用できないため、どこにでもインポートすることはできません。

ここでは、信号関数の外でタスク関数を定義し、ここで別のデコレータとtasks.pyで使用することで、この問題を回避できます。問題が発生しないtasks.pyで使用できる@shared_taskデコレータとは別に、デコレータがあると私は思っています。

現在の最善の解決策は次のようになります。

task_app.__init__.py:

def my_task(uuid): 
    # do stuff 
    print(uuid) 

def add_tasks(celery): 
    celery_my_task = celery.task(my_task) 
    for new_task in settings.NEW_TASKS: 
     celery.add_periodic_task(
      new_task['interval'], 
      celery_my_task(new_task['uuid']), 
      name='My Task %s' % new_task['uuid'], 
     ) 

task_app.tasks.py:

from celery import shared_task 
from task_app import my_task 
shared_my_task = shared_task(my_task) 

myapp.celery.py:

import os 
from celery import Celery 
from django.conf import settings 


# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings') 

app = Celery('my_app') 

@app.on_after_configure.connect 
def setup_periodic_tasks(sender, **kwargs): 
    from task_app import add_tasks 
    add_tasks(sender) 


app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 
関連する問題