2012-10-23 17 views
9

消費者/労働者がある場合は、を消費することを確認したいと思います。私はお送りしようとしています。PikaまたはRabbitMQでは、消費者が現在消費しているかどうかを確認するにはどうすればよいですか?

任意のワーカーがない場合、私はいくつかの労働者を開始(消費者や出版社の両方が単一のマシン上にある)、その後メッセージを公開して行くでしょう。

connection.check_if_has_consumersのような機能があれば、私は多少このようにそれを実装するだろう -

import pika 
import workers 

# code for publishing to worker queue 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
channel = connection.channel() 

# if there are no consumers running (would be nice to have such a function) 
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""): 
    # start the workers in other processes, using python's `multiprocessing` 
    workers.start_workers() 

# now, publish with no fear of your queues getting filled up 
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True) 
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin", 
          properties=pika.BasicProperties(delivery_mode=2)) 
connection.close() 

をしかし、私はナキウサギcheck_if_has_consumers機能を持つ任意の関数を見つけることができません。

pikaを使用してこれを達成する方法はありますか?またはおそらく話すことによってうさぎで直接ですか?

私はちょうど私が完全にわからないが、私は実際にそれが彼らにメッセージをディスパッチないのでは、消費者の数を認識しているであろうのRabbitMQは、異なるキューに加入考えるとのACK

を受け入れますその任意のヘルプ場合は、ここで

は私が書いたworkers.pyコードは、ある...任意のヘルプは大歓迎です... 3時間前のRabbitMQで始まった....

import multiprocessing 
import pika 


def start_workers(num=3): 
    """start workers as non-daemon processes""" 
    for i in xrange(num):  
     process = WorkerProcess() 
     process.start() 


class WorkerProcess(multiprocessing.Process): 
    """ 
    worker process that waits infinitly for task msgs and calls 
    the `callback` whenever it gets a msg 
    """ 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.stop_working = multiprocessing.Event() 

    def run(self): 
     """ 
     worker method, open a channel through a pika connection and 
     start consuming 
     """ 
     connection = pika.BlockingConnection(
           pika.ConnectionParameters(host='localhost') 
        ) 
     channel = connection.channel() 
     channel.queue_declare(queue='worker_queue', auto_delete=False, 
                durable=True) 

     # don't give work to one worker guy until he's finished 
     channel.basic_qos(prefetch_count=1) 
     channel.basic_consume(callback, queue='worker_queue') 

     # do what `channel.start_consuming()` does but with stopping signal 
     while len(channel._consumers) and not self.stop_working.is_set(): 
      channel.transport.connection.process_data_events() 

     channel.stop_consuming() 
     connection.close() 
     return 0 

    def signal_exit(self): 
     """exit when finished with current loop""" 
     self.stop_working.set() 

    def exit(self): 
     """exit worker, blocks until worker is finished and dead""" 
     self.signal_exit() 
     while self.is_alive(): # checking `is_alive()` on zombies kills them 
      time.sleep(1) 

    def kill(self): 
     """kill now! should not use this, might create problems""" 
     self.terminate() 
     self.join() 


def callback(channel, method, properties, body): 
    """pika basic consume callback""" 
    print 'GOT:', body 
    # do some heavy lifting here 
    result = save_to_database(body) 
    print 'DONE:', result 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

EDIT:より良いアプローチは、一緒に来ていない限り、私はので、ここで前方に移動する必要が

が、私は取るつもりな回避策で、

ので、RabbitMQのはこれらを持っていますHTTP management apisは、management pluginをオンにした後、HTTPのapisページの途中にあります。

/api/connections - 開いているすべての接続のリスト。

/api/connections/name - 個々の接続。それを削除すると接続が切断されます。私は私の労働者を接続して、私のが異なる接続名/ユーザーによっての両方を生成する場合ワーカー接続が開いている場合

ので、私はそこ(...チェックできるようになります労働者が死ぬと問題になるかもしれません...)

より良い解決策を待っています...

EDIT:

がちょうどRabbitMQのドキュメントでこれを見つけましたが、これはPythonで行うのはハックのようになります。

[email protected]:~$ sudo rabbitmqctl -p vhostname list_queues name consumers 
Listing queues ... 
worker_queue 0 
...done. 

ので、私のような何かを行うことができ、

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'") 

ハッキー...まだピカはこれを行うためのいくつかのpython関数を持っています...

ありがとう、

答えて

7

私はこれも調べていました。ソースとドキュメントを読んだ後、私はchannel.pyで次の項目を見つけました:

@property 
def consumer_tags(self): 
    """Property method that returns a list of currently active consumers 

    :rtype: list 

    """ 
    return self._consumers.keys() 

私自身のテストは成功しました。自分のチャンネルオブジェクトがself._channelの場所で次のように使用しました:

0

実際にこの問題が発生しましたが、別の問題がありましたが、Basic_Publish関数に役立つものが1つあります。デフォルトはFalseです。

イミディエイトフラグをTrueに設定すると、キューに置かれる代わりに消費者がすぐに消費する必要があります。ワーカーがメッセージを消費することができない場合は、別のワーカーを開始するようにエラーを返すでしょう。

システムのスループットによっては、多くの余分なワーカーを産んだり、死んだ労働者を置き換えるためにワーカーを産んだりすることになります。以前の問題では、制御キューを介して単に従業員を追跡する管理者様のシステムを作成することができます。ここでは、「ランナー」プロセスに、もはや不要になった作業者のプロセスを終了させるよう指示できます。

関連する問題