私は、プロジェクト設定のディクショナリリスト(django.conf.settings
からインポートされたもの)に基づいて、定期的にセロリタスクを動的に追加するモジュールを作成しました。 私は設定で指定された特定のuuid
で呼び出される関数add_tasks
そのスケジュール機能使用していることを実行します。Celery add_periodic_taskブロック
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
ことなどが、私は私のcelery.py
で関数を呼び出すためにon_after_configure.connect
信号を使用hereを示唆した。
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
このセットアップはcelery beat
とcelery worker
の両方のために正常に動作しますが、私は私のDjangoのアプリケーションを提供するためにuwsgi
を使用し、私のセットアップを壊します。 Uwsgi
は、ビューコードがセロリの.delay()
メソッドを使用してタスクを送信する最初の時点までスムーズに実行されます。その時点では、セロリはuwsgi
で初期化されていますが、上記のコードでは永遠にブロックされているようです。私は、コマンドラインから手動でこれを実行した場合、それブロックは、私は以下の(短縮)スタックトレースを取得する際の割り込み:ミューテックスを取得に問題があるよう
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'data'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(SHORTENED HERE. Just contained the trace from the console through my call to this function)
File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
my_task.s(new_task['uuid']),
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
return loc(*self.__args, **self.__kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
return app.tasks[
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
self.finalize(auto=True)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
with self._finalize_mutex:
それはそうです。
現在、私はsys.argv[0]
がuwsgi
が含まれ、その後のみbeat
が作業を必要とするよう、定期的なタスクを追加しますが、私はより多くの問題を完全に解決するために、ここで間違って起こっているかを理解したいと思いませんかどうかを検出するために、回避策を使用しています。
この問題は、マルチスレッドまたはマルチプロセスのuwsgiを使用することと関係がありますか?
問題の解決に役立つヒントをいただければ幸いです。ありがとうございました。
私が使用しています:Djangoの1.11.7とセロリを4.1.0
私はこの問題のために、最小限のセットアップを作成している1
編集:
celery.py:
import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
60,
my_task.s(),
name='Testtask'
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py:
from celery import shared_task
@shared_task()
def my_task():
print('ran')
CELERY_TASK_ALWAYS_EAGER = Falseであり、動作中のメッセージキューがあることを確認してください。
ラン:
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
は、上記のエラーを確認するために中断する前に、約10秒待ちます。
なぜあなたはDjangoのprでそれを初期化しますか?それを行うための専用のプロセスを持つのではなく、 –
私は専用の 'celery beat'プロセスを持っていますが、私はdjangoプロセスから' .delay() 'を呼び出す必要があります。それがコードブロックです。 – Tim
add_tasksはDjangoプロセスから呼び出されませんか? –