2012-03-20 14 views
5

トピック交換を設定するために私の設定がどのようになっているのか混乱しています。セロリとRabbitMQを使ったトピック交換

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

これは私が達成したいものです。その後、

Task1 -> send to QueueOne and QueueFirehose 
Task2 -> sent to QueueTwo and QueueFirehose 

Task1 -> consume from QueueOne 
Task2 -> consume from QueueTwo 
TaskFirehose -> consume from QueueFirehose 

私は唯一のタスク1がQueueTwoから消費するQueueOneとTask2のから消費したいです。

この問題は、Task1と2が実行されると、QueueFirehoseも使い果たされ、TaskFirehoseタスクが決して実行されないということです。

私の設定に何か問題がありますか、何か誤解していますか?

Task1 -> send to QueueOne 
Task2 -> sent to QueueTwo 
TaskFirehose -> send to QueueFirehose 

その後:

Worker1 -> consume from QueueOne, QueueFirehose 
Worker2 -> consume from QueueTwo, QueueFirehose 
WorkerFirehose -> consume from QueueFirehose 

これは、あなたが何を意味するのか、正確ではないかもしれませんが、私はそれは多くのシナリオをカバーすべきだと思うと

CELERY_QUEUES = { 
    "QueueOne": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.one", 
    }, 
    "QueueTwo": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.two", 
    }, 
    "QueueFirehose": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.#", 
    }, 
} 

CELERY_ROUTES = { 
     "tasks.task1": { 
      "queue": 'QueueOne', 
      "routing_key": 'pipeline.one', 
     }, 
     "tasks.task2": { 
      "queue": 'QueueTwo', 
      "routing_key": 'pipeline.two', 
     }, 
     "tasks.firehose": { 
      'queue': 'QueueFirehose', 
      "routing_key": 'pipeline.#', 
     }, 
} 
+0

これは説明のための用語に過ぎないかもしれませんが、あなたの説明は、あなたがタスクと労働者を融合させているように思えます。たとえば、「Task2がQueue2に送信されました」と後で「Queue2から消費するTask2」と言います。タスクは消費されません。彼らは(労働者によって)消費される。また、 "TaskFirehoseタスクは実行されません"と言っていますが、あなたの説明では、TaskFirehoseがキューに送られていません。基本的な考え方は次のとおりです。タスクはキューに送られます。作業者は割り当てられたキューからタスクを実行します。タスク!=それらを実行するワーカー。 –

答えて

0

あなたが実際にこのような何かを意味すると仮定するとうまくいけばあなたも。このような 何か作業をする必要があります:

# Advanced example starting 10 workers in the background: 
# * Three of the workers processes the images and video queue 
# * Two of the workers processes the data queue with loglevel DEBUG 
# * the rest processes the default' queue. 

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data 
-Q default -L:4,5 DEBUG 

をより多くのオプションと参考のために:http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

は、これはドキュメントからまっすぐでした。

私も同様の状況で、少し違った方法で対応しました。私はsupervisordとセロリマルチを使用することができませんでした。 代わりに私はスーパーバイザで複数のプログラムを作成しました。とにかく労働者はいろいろなプロセスをとるので、スーパーバイザーがあなたのためにすべてを世話するようにしてください。 設定ファイルのようなものになります -

同様
; ================================== 
; celery worker supervisor example 
; ================================== 

[program:Worker1] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose 

directory=/path/to/project 
user=nobody 
numprocs=1 
stdout_logfile=/var/log/celery/worker1.log 
stderr_logfile=/var/log/celery/worker1.log 
autostart=true 
autorestart=true 
startsecs=10 

; Need to wait for currently executing tasks to finish at shutdown. 
; Increase this if you have very long running tasks. 
stopwaitsecs = 600 

; When resorting to send SIGKILL to the program to terminate it 
; send SIGKILL to its whole process group instead, 
; taking care of its children as well. 
killasgroup=true 

; if rabbitmq is supervised, set its priority higher 
; so it starts first 
priority=998 

は、Worker2とWorkerFirehoseのために、作るために対応する行を編集します。

[program:Worker2] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose 

[program:WorkerFirehose] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose 

がsupervisordでそれらのすべてが含まれます.confファイルとそれはそれを行う必要があります。

関連する問題