2011-10-10 14 views
3

Celeryのスレッドを使用して非常に簡単な定期コードがあります。 "Pre"と "Post"を印刷し、その間に寝るだけです。それは、このコード'already in use...'決してプリントthis StackOverflow questionthis linked websiteCelery periodic_taskが複数回並列に実行されています

from celery.task import task 
from celery.task import periodic_task 
from django.core.cache import cache 
from time import sleep 
import main 
import cutout_score 
from threading import Lock 

import socket 
from datetime import timedelta 
from celery.decorators import task, periodic_task 

def single_instance_task(timeout): 
    def task_exc(func): 
    def wrapper(*args, **kwargs): 
     lock_id = "celery-single-instance-" + func.__name__ 
     acquire_lock = lambda: cache.add(lock_id, "true", timeout) 
     release_lock = lambda: cache.delete(lock_id) 
     if acquire_lock(): 
      try: 
       func() 
      finally: 
       release_lock() 
    return wrapper 
    return task_exc 

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes 
@periodic_task(run_every = timedelta(seconds=2)) 
def test(): 
    lock_id = "lock" 

    # cache.add fails if if the key already exists 
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE) 
    # memcache delete is very slow, but we have to use it to take 
    # advantage of using add() for atomic locking 
    release_lock = lambda: cache.delete(lock_id) 

    if acquire_lock(): 
     try: 
      print 'pre' 
      sleep(20) 
      print 'post' 
     finally: 
      release_lock() 
     return 
    print 'already in use...' 

から構成されています。デコレータ@single_instance_taskを使用した場合と同じ現象が発生します。

あなたは何が間違っているのか分かりますか?

編集:質問をメモリに書き込まないように簡略化しました(グローバルまたはdjangoキャッシュを使用)。私はまだ編集'already in use...'


を見ることはありません:、私は私のDjangoのsettings.pyファイルに次のコードを追加します(https://docs.djangoproject.com/en/dev/topics/cache/すべてのコードを変更することでが期待通りに動作しますが、私はポート11211を使用する場合にのみ(奇妙なことに、私のサーバーはポート8000​​である)

CACHES = { 
    'default': { 
     'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', 
     'LOCATION': [ 
      '127.0.0.1:11211' 
     ] 
    } 
} 

答えて

3

はどのようにcelerydを実行している?私は、ねじオプションに精通していないよ。

マルチプロセスを実行している場合、ワーカー間の共有メモリである「グローバル」変数はありません。

カウンタをすべてのワーカー間で共有する場合は、cache.incrを使用することをおすすめします。あなたが眠っていることにより、オーバーラップするように、あなたのタスクを強制場合たとえば、何が起こる

In [1]: from django.core.cache import cache 

In [2]: cache.set('counter',0) 

In [3]: cache.incr('counter') 
Out[3]: 1 

In [4]: cache.incr('counter') 
Out[4]: 2 

更新

print "Task on %r started" % (self,) 
sleep(20) 
print "Task on %r stopped" % (self,) 

あなたが取得しない場合

例: 「既に使用中です...」より頻繁に実行してから20秒間キャッシュが期待どおりに動作していないことがわかります。


別の更新

あなたはDjangoの設定でキャッシュバックエンドをセットアップすることがありますか?例えば。あなたが実際にキャッシングを行いませんDummy Cacheを使用することができるない場合

をmemcachedを、ちょうどあなたの問題の説得力の原因のように鳴っているインタフェースを...実装しています。

+0

+1私の問題に関連しているようです。私はキャッシュを使用しようとしましたが、依然として 'counter 'の値が不正です。また、複数の作業者が 'test'関数に入ることがわかりました。私はdjangoを使ってcelerydを実行しています: 'python manage。py celeryd -v 2 -B -s Celery -E -l INFO' – user

+0

'test'関数が単純に" hello "を出力するように単純化しても、別のワーカーで動作し、あまりにも頻繁に印刷します(たとえ私が'@ single_instance_task'デコレータが定義されています)。 – user

+0

私は(上の)コードを簡略化して、(あなたが示唆したように)印刷するようにしました。 ''すでに使用中です... ''という印字はしません。何とかキャッシュが正常にロックされていません。 – user

関連する問題