2017-01-30 10 views
4

私は他のAPIのリストを返すAPIを持っています。他のセロリのタスクを実行しているセロリの定期タスク

これらのAPIに15分おきにアクセスし、データをデータベースに戻す必要があります。

以下は、セロリとredisを使ってcelery_worker.pyファイルに書いたものです。しかし、すべてのタスクが開始されません。

list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json() 

CELERYBEAT_SCHEDULE = { 
    'every-15-minute': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': timedelta(minutes=15), 
    }, 
} 

@celery.task 
def access_one_API(one_API): 
    return requests.get(one_API).json() 

@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(): 
    for one_API in list_of_APIs: 
      task = access_one_API.delay(one_API) 
      # some codes to put all task.id into a list_of_task_id 

    for task_id in list_of_task_id: 
      # some codes to get the results of all tasks 
      # some codes to put all the results into a database 

fetch_data_of_all_APIs機能は、セロリサーバが正常にターミナルで起動しますが、どちらもfetch_data_of_all_APIsaccess_one_API開始

access_one_API機能を実行するために、複数の労働者を使用することになって15分ごとに実行する必要があります。

fetch_data_of_all_APIs関数内のコードを引き出すと、access_one_APIが起動し、複数のセラーワーカーによって実行されます。しかし、これらのコードを関数内に入れて@celery.taskで装飾するとすぐに、両方の関数が起動しません。

私はそれがセロリと関係があると信じています。

事前に感謝します。

+0

'@ celery.task()'デコレータが必要であることに注意してください。また、現在のセロリのバージョンでは小文字の設定が使われているので、 'celery-beat'設定パラメータをチェックする必要があります。 –

答えて

0

ここでは、セミナーでサブタスクを使用して定期的なタスクを構成する方法を説明します(デモ用に20秒を設定します)。 tasks.py:

import celery 
from celery.canvas import subtask 
from celery.result import AsyncResult 
# just for example list of integer values 
list_of_APIs = [1, 2, 3, 4] 


@celery.task(name='access_one_API') 
def access_one_API(api): 
    """ 
    Sum of subtask for demonstration 
    :param int api: 
    :return: int 
    """ 
    return api + api 


@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(list_of_APIs): 
    list_task_ids = [] 

    for api in list_of_APIs: 
     # run of celery subtask and collect id's of subtasks 
     task_id = subtask('access_one_API', args=(api,)).apply_async().id 
     list_task_ids.append(task_id) 

    result_sub_tasks = {} 

    for task_id in list_task_ids: 
     while True: 
      task_result = AsyncResult(task_id) 
      if task_result.status == 'SUCCESS': 
       # if subtask is finish add result and check result of next subtask 
       result_sub_tasks[task_id] = task_result.result 

       break 

    print result_sub_tasks 
    # do something with results of subtasks here... 


app = celery.Celery(
    'tasks', 
    broker='redis://localhost:6379/0', 
    backend='redis://localhost:6379/0', 
) 


app.conf.beat_schedule = { 
    'add-every-20-seconds': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': 20.0, 
     # args for fetch_data_of_all_APIs 
     'args': (list_of_APIs,) 
    }, 
} 

実行セロリ:端末からcelery worker -A tasks.app --loglevel=info --beat

トレース:

[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2} 

・ホープ、このことができます。

関連する問題