2017-03-01 4 views
0

PikaClientが消費するキューを手動で削除すると、何も起こりません。私は同じ名前のキューを再作成することができますが、チャネルはキューを消費するために停止しています(削除したので正常です)。しかし、消費されたキューが削除されたときにイベントを受信したいと考えています。キューを検出する方法が削除されましたか?

チャンネルは自動的に閉じられるが、«on_channel_close_callback»は決して呼び出されないと予想した。 «basic_consume»はクローズ時にコールバックを提供しません。 さらに重要な点は、TornadoConnectionを使用する必要があることです。

ピカ:0.10.0 パイソン:2.7 トルネード:4.3

はあなたの助けのために、ありがとうございます。

class PikaClient(object): 

    def __init__(self): 
     # init everything here 

    def connect(self): 
     pika.adapters.tornado_connection.TornadoConnection(connection_param, on_open_callback=self.on_connected) 

    def on_connected(self, connection): 
     self.logger.info('PikaClient: connected to RabbitMQ') 
     self.connected = True 
     self.connection = connection 
     self.connection.channel(self.on_channel_open) 

    def on_open_error_callback(self, *args): 
     self.logger.error("on_open_error_callback") 

    def on_channel_open(self, channel): 
     channel.add_on_close_callback(self.on_channel_close_callback) 

     channel.basic_consume(self.on_message, queue=self.queue_name, no_ack=True) 

    def on_channel_close_callback(self, reply_code, reply_text): 
     self.logger.error("Consumer was cancelled remotely, shutting down", reply_code=reply_code, reply_text=reply_text) 

答えて

0

解決方法が見つかりました。 PikaClientがメッセージを消費した場合は、毎秒チェックします。そうでなければ、自動的にキューを作成するアプリケーションを再起動します。

もっと良い解決策がある場合は、私はまだ提案を受けています。

def __init__(self): 
    ... 
    self.have_messages_been_consumed = False 

def on_connected(self, connection): 
    self.logger.info('PikaClient: connected to RabbitMQ') 
    self.connected = True 
    self.connection = connection 
    self.connection.add_timeout(X, self.check_if_messages_have_been_consumed) 
    self.connection.channel(self.on_channel_open) 

def check_if_messages_have_been_consumed(self): 
    if self.have_messages_been_consumed: 
     self.have_messages_been_consumed = False 
     self.connection.add_timeout(X, self.check_if_messages_have_been_consumed) 
    else: 
     # close_and_restart will set to False have_messages_been_consumed 
     self.close_and_restart() 

def on_message(self, channel, basic_deliver, header, body): 
    self.have_messages_been_consumed = True 
    ... 
関連する問題