2016-11-16 6 views
0

最近Celeryインストールを4.0にアップグレードしました。アップグレードプロセスに取り組んで数日後、ついにそれが動作するようになった。いくつかのタスクは戻ってきますが、最終的なタスクは戻りません。Celeryを3.1から4.0にアップグレードした後、Redisが結果を返さない

私が取り込んでファイルを解析し、クラス、SFFを、持っている:

# Constructor with I/O file 
def __init__(self, file): 

    # File data that's gonna get used a lot 
    sffDescriptor = file.fileno() 
    fileName = abspath(file.name) 

    # Get the pointer to the file 
    filePtr = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ) 

    # Get the header info 
    hdr = filePtr.read(HEADER_SIZE) 
    self.header = SFFHeader._make(unpack(HEADER_FMT, hdr)) 

    # Read in the palette maps 
    print self.header.onDemandDataSize 
    print self.header.onLoadDataSize 
    palMapsResult = getPalettes.delay(fileName, self.header.palBankOff - HEADER_SIZE, self.header.onDemandDataSize, self.header.numPals) 

    # Read the sprite list nodes 
    nodesStart = self.header.sprListOff 
    nodesEnd = self.header.palBankOff 
    print nodesEnd - nodesStart 
    sprNodesResult = getSprNodes.delay(fileName, nodesStart, nodesEnd, self.header.numSprites) 

    # Get palette data 
    self.palettes = palMapsResult.get() 

    # Get sprite data 
    spriteNodes = sprNodesResult.get() 

    # TESTING 
    spritesResultSet = ResultSet([]) 
    numSpriteNodes = len(spriteNodes) 
    # Split the nodes into chunks of size 32 elements 
    for x in xrange(0, numSpriteNodes, 32): 
     spritesResult = getSprites.delay(spriteNodes, x, x+32, fileName, self.palettes, self.header.palBankOff, self.header.onDemandDataSizeTotal) 
     spritesResultSet.add(spritesResult) 
     break # REMEMBER TO REMOVE FOR ENTIRE SFF 

    self.sprites = spritesResultSet.join_native() 

それが全体spritesResultを返しシングルタスクかどうかは問題ではありませんか、私はResultSetを使用して、それを分割している場合その結果はいつも同じです。私が使用しているPythonコンソールは、spritesResultSet.join_native()またはspritesResult.get()のいずれかでハングします(どのようにフォーマットするかによって異なります)。

@task 
def getSprites(nodes, start, end, fileName, palettes, palBankOff, onDemandDataSizeTotal): 
sprites = [] 

with open(fileName, "rb") as file: 
    sffDescriptor = file.fileno() 
    sffData = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ) 

    for node in nodes[start:end]: 
     sprListNode = dict(SprListNode._make(node)._asdict()) # Need to convert it to a dict since values may change. 
     #print node 
     #print sprListNode 

     # If it's a linked sprite, the data length is 0, so get the linked index. 
     if sprListNode['dataLen'] == 0: 
      sprListNodeTemp = SprListNode._make(nodes[sprListNode['index']]) 
      sprListNode['dataLen'] = sprListNodeTemp.dataLen 
      sprListNode['dataOffset'] = sprListNodeTemp.dataOffset 
      sprListNode['compression'] = sprListNodeTemp.compression 

     # What does the offset need to be? 
     dataOffset = sprListNode['dataOffset'] 
     if sprListNode['loadMode'] == 0: 
      dataOffset += palBankOff #- HEADER_SIZE 
     elif sprListNode['loadMode'] == 1: 
      dataOffset += onDemandDataSizeTotal #- HEADER_SIZE 

     #print sprListNode 

     # Seek to the data location and "read" it in. First 4 bytes are just the image length 
     start = dataOffset + 4 
     end = dataOffset + sprListNode['dataLen'] 
     #sffData.seek(start) 

     compressedSprite = sffData[start:end] 

     # Create the sprite 
     sprite = Sprite(sprListNode, palettes[sprListNode['palNo']], np.fromstring(compressedSprite, dtype=np.uint8)) 
     sprites.append(sprite) 

return json.dumps(sprites, cls=SpriteJSONEncoder) 

私は右のそれの上に印刷を置けば、それはセロリウィンドウで印刷されますので、それは、return文に到達した知っている:ここで

は、当該作業です。

[2016年11月16日00:03:33639:INFO/PoolWorker-4]タスクframedatabase.tasks.getSpritesまた、私は労働者から以下のメッセージが表示されますので、タスクが完了するまで実行されていることを知っています[285ac9b1-09b4-4cf1-a251-da6212863832]は0.137236133218sで成功しました: '[{"width":120、 "palNo":30、 "group":9000、 "xAxis":0、 "yAxis":0、データ ":...」

ここでは、settings.pyの私のセロリの設定です:

# Celery settings 
BROKER_URL='redis://localhost:1717/1' 
CELERY_RESULT_BACKEND='redis://localhost:1717/0' 
CELERY_IGNORE_RESULT=False 
CELERY_IMPORTS = ("framedatabase.tasks",) 

...と私のcelery.py:

from __future__ import absolute_import 

import os 

from celery import Celery 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'framedatabase.settings') 

from django.conf import settings # noqa 

app = Celery('framedatabase', backend='redis://localhost:1717/1', broker="redis://localhost:1717/0", 
    include=['framedatabase.tasks']) 

# Using a string here means the worker will not have to 
# pickle the object when using Windows. 
app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks() 


@app.task(bind=True) 
def debug_task(self): 
    print('Request: {0!r}'.format(self.request)) 

答えて

0

問題が見つかりました。どうやらそれはここセロリのマニュアルの「同期サブタスクを起動しないでください」の項で述べたように、デッドロックにつながるました:http://docs.celeryproject.org/en/latest/userguide/tasks.html#tips-and-best-practices

だから私はラインを処分した:

sprNodesResult.get() 

とに最終結果を変更しましたチェーン:

self.sprites = chain(getSprNodes.s(fileName, nodesStart, nodesEnd, self.header.numSprites), 
    getSprites.s(0,32,fileName,self.palettes,self.header.palBankOff,self.header.onDemandDataSizeTotal))().get() 

これは機能します。今私はちょうど私が望むようにこれを分割する方法を見つける必要があります!

関連する問題