2012-10-10 9 views
11

map/reduceワークフローでセロリGroupプリミティブを傘のタスクとして使用できますか?map/reduceワークフローで使用するセロリグループタスク

さらに具体的には、グループ内のサブタスクを、複数の従業員が複数のサーバー上で実行することはできますか??ドキュメントから

すべて1つのワーカーに送信されたタスクを意味すると思わ
However, if you call apply_async on the group it will send a special 
grouping task, so that the action of calling the tasks happens in a worker 
instead of the current process 

...

3.0(まだ)1は、タスクセットでサブタスクをオフに解雇ことができる前にいます複数のサーバーで実行されます。問題は、すべてのタスクの実行が終了したかどうかを判断することです。これは通常、本当にエレガントではないすべてのサブタスクをポーリングすることによって行われます。 この問題を緩和するためにグループプリミティブを使用できるかどうかは疑問です。

+0

は、少なくともセロリ3.1では、通常の 'group'コマンドで完璧にタスクを配布しますが、上記のステートメントがドキュメントから削除されたようです – Grozz

答えて

23

このようなマップのような問題を減らすためにコードを使用することが可能であることが分かりました。

@celery.task(name='ic.mapper') 
def mapper(): 
    #split your problem in embarrassingly parallel maps 
    maps = [map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s()] 
    #and put them in a chord that executes them in parallel and after they finish calls 'reduce' 
    mapreduce = celery.chord(maps)(reduce.s())  
    return "{0} mapper ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

@celery.task(name='ic.map') 
def map(): 
    #do something useful here 
    import time 
    time.sleep(10.0) 
    return "{0} map ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

@celery.task(name='ic.reduce') 
def reduce(results): 
    #put the maps together and do something with the results 
    return "{0} reduce ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

マッパーが、それは最初にあなたの問題を分割し再びブローカーに提出されている新しいサブタスクを作成マッパーを実行3労働者/サーバのクラスタ上で実行されます。キューはすべてのブローカーによって消費されるため、これらは並行して実行されます。また、すべてのマップをポーリングして終了したかどうかを確認するコードタスクが作成されます。終了すると、結果を結合することができるreduceタスクが実行されます。

すべてで可能です。野菜のおかげで、ありがとう!

関連する問題