2016-11-04 6 views
0

しかし、私はコンピュータを開いたままにして、50秒ごとにタスクを実行するセロリを実行し、1時間のスキップをいくつか見ました。予期しないスキップを除いて、実際にはniceを実行しています。なぜこうなった?これを解決するには?ここでセロリは1時間にタスクをスキップします

は私の労働者-l情報にスキップされたログの例です

2016-11-03 10:13:36,264: INFO/MainProcess] Task core.tasks.sample[8efcedc5-1e08-41c4-80b9-1f82a9ddbaad] succeeded in 1.062010367s: None 
[2016-11-03 11:14:19,751: INFO/MainProcess] Received task: core.tasks.sample[ca9d6ef4-2cdc-4546-a9fb-c413541a80ee] 

ここでここで

[2016-11-03 10:13:35,199: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample) 
[2016-11-03 11:14:19,748: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample) 

私のビート-l情報にスキップされたログの例です私のタスクのコードは次のとおりです。ここで

# 50 seconds 
@periodic_task(run_every=timedelta(**settings.XXX_XML_PERIODIC_TASK)) 
def sample(): 
    global GLOBAL_CURRENT_DATE 
    if cache.get('XXX_xml_today_saved_data') is None: 
     cache.set('XXX_xml_today_saved_data', []) 
    saved_data = cache.get('XXX_xml_today_saved_data') 
    ftp = FTP('xxxxx') 
    ftp.login(user='xxxxx', passwd='xxxxx') 
    ftp.cwd('XXX') 
    date_dir = GLOBAL_CURRENT_DATE.replace("-", "") 
    try: 
     ftp.cwd(date_dir) 
    except: 
     ftp.cwd(str(int(date_dir) - 1)) 
    _str = StringIO() 
    files = ftp.nlst() 
    if (GLOBAL_CURRENT_DATE != datetime.now().strftime("%Y-%m-%d") and 
      files == saved_data): 
     GLOBAL_CURRENT_DATE = datetime.now().strftime("%Y-%m-%d") 
     cache.delete('XXX_xml_today_saved_data') 
     return 
    print files 
    print "-----" 
    print saved_data 
    unsaved = list(set(files) - set(saved_data)) 
    print "-----" 
    print unsaved 
    if unsaved: 
     file = min(unsaved) 
     # modified_time = ftp.sendcmd('MDTM '+ file) 
     print file 
     ftp.retrbinary('RETR ' + file, _str.write) 
     xml = '<root>' 
     xml += _str.getvalue() 
     xml += '</root>' 
     if cache.get('XXX_provider_id') is None: 
      cache.set('XXX_provider_id', Provider.objects.get(code="XXX").id) 
     _id = cache.get('XXX_provider_id') 
     _dict = xmltodict.parse(xml, process_namespaces=True, 
           dict_constructor=dict, attr_prefix="") 
     row = _dict['root']['row'] 
     if type(_dict['root']['row']) == dict: 
      _dict['root']['row'] = [] 
      _dict['root']['row'].append(row) 
      row = _dict['root']['row'] 
     for x in row: 
      if cache.get('XXX_data_type_' + x['dataType']) is None: 
       obj, created = DataType.objects.get_or_create(code=x['dataType']) 
       obj, created = ProviderDataType.objects.get_or_create(provider_id=_id, data_type=obj) 
       if created: 
        cache.set('XXX_data_type_' + x['dataType'], obj.id) 
      _id = cache.get('XXX_data_type_' + x['dataType']) 
      obj, created = Transaction.objects.get_or_create(data=x, file_name=file, 
             provider_data_type_id=_id) 
      if created: 
       if x['dataType'] == "BR": 
        print "Transact" 
        br_transfer(**x) 
      else: 
       print "Not transacting" 

     saved_data.append(file) 
     cache.set('XXX_xml_today_saved_data', saved_data) 
    ftp.close() 

はsettings.pyの私のセロリのCONFIGSです:

BROKER_URL = 'redis://localhost:6379' 
CELERY_RESULT_BACKEND = 'redis://localhost:6379' 
CELERY_ACCEPT_CONTENT = ['application/json'] 
CELERY_TASK_SERIALIZER = 'json' 
CELERY_RESULT_SERIALIZER = 'json' 
CELERY_TIMEZONE = 'Africa/Nairobi' 
XXX_XML_PERIODIC_TASK = {'seconds': 50} 

CACHES = { 
    'default': { 
     'BACKEND': 'redis_cache.RedisCache', 
     'LOCATION': 'localhost:6379', 
     'TIMEOUT': None, 
    }, 
} 

説明や提案はありますか?

私のpython 2.7.10を使用してとDjangoいます1.10

+0

は、あなたがより多くの労働者を追加しようとしたことがありますか?タスクが起動する時点で利用可能なものがない場合は、タスクが使用可能になるまで待機する必要があります。 – rrauenza

+0

労働者を追加するには?私はこの作業を初めて実行しています –

+0

http://docs.celeryproject.org/en/latest/userguide/workers.html - 最初に試してみてください - コンカレンシー – rrauenza

答えて

1

問題が2つある可能性があります。最も可能性が高いのは、あなたの仕事が誘発されたときにあなたの労働者が忙しいということです。あなたはより多くの労働者を持つことでこれを防ぐことができます。 docsは、単一のワーカー用のオプション--concurrencyと、複数のワーカープロセスを実行するためのオプションを説明しています。

特定のタスクが特定のプロジェクトに割り当てられるように、異なるプロジェクトにアタッチされた別のワーカーを実行することもできます。つまり特定のタスクの専用キュー:Starting worker with dynamic routing_key?

私が見てきたことは、作業者がタスクをプリフェッチして保持できることです。しかし、現在実行中のタスクがカウントダウンを過ぎて実行されると、タスクが遅れることがあります。あなたがCELERYD_PREFETCH_MULTIPLIER上に読みたいと思うでしょう

+0

ありがとう!私は並行性のために行く必要があります、私たちのインターネットは断続的な方法で、時にはタスクに失敗する、それはまた理由かもしれない、私は正しい? –

+0

ロギングを追加して調べますか? – rrauenza

+0

私は実際にセロリのログを使用します –

1

セロリの労働者は、彼らは準備ができているキューからタスクをポップが、タスクはカウントダウンを持っている場合には、その間に他のタスクをポップして待ちます他のことをすることによって期限が切れる時間。その時点でタスクが実行されることは保証されません。

+0

私の解決策は何ですか?設定されている期間のタスク時間に常に従う方法はありますか? –

+0

保証のためにcronを使用 – theWanderer4865

+0

申し訳ありませんが、私はこれらのことに本当に慣れていませんか?私と別のコードを提案したり、コードを作ることはできますか? –

関連する問題