2016-05-12 11 views
1

私はコンシューマにいくつかのドロップアウトが発生しています。私はrabbitmqのpikaライブラリを使用してセットアップしています。 pikaと一緒に、私は非同期のコンシューマを設定するためにねじれ実装を使用しています。私はなぜこれが起こっているのか分かりませんが、消費者が脱落してこれをやり遂げる方法がわからない場合は、再接続を実装したいと思います。ここに私の現在の実装はコンシューマを再起動する方法rabbitmq pika python

class Consumer(object): 
def __init__(self, queue, exchange, routingKey, medium, signalRcallbackFunc): 
    self._queue_name = queue 
    self.exchange = exchange 
    self.routingKey = routingKey 
    self.medium = medium 
    print "client on" 
    self.channel = None 
    self.medium.client.on(signalRcallbackFunc, self.callback) 

def on_connected(self, connection): 
    d = connection.channel() 
    d.addCallback(self.got_channel) 
    d.addCallback(self.queue_declared) 
    d.addCallback(self.queue_bound) 
    d.addCallback(self.handle_deliveries) 
    d.addErrback(log.err) 

def got_channel(self, channel): 
    self.channel = channel 
    self.channel.basic_qos(prefetch_count=500) 
    return self.channel.queue_declare(queue=self._queue_name, durable=True) 

def queue_declared(self, queue): 
    self.channel.queue_bind(queue=self._queue_name, 
          exchange=self.exchange, 
          routing_key=self.routingKey) 

def queue_bound(self, ignored): 
    return self.channel.basic_consume(queue=self._queue_name) 

def handle_deliveries(self, queue_and_consumer_tag): 
    queue, consumer_tag = queue_and_consumer_tag 
    self.looping_call = task.LoopingCall(self.consume_from_queue, queue) 

    return self.looping_call.start(0) 

def consume_from_queue(self, queue): 
    d = queue.get() 
    return d.addCallback(lambda result: self.handle_payload(*result)) 

def handle_payload(self, channel, method, properties, body): 
    print(body) 
    print(properties.headers) 
    channel.basic_ack(method.delivery_tag) 
    print "#####################################" + method.delivery_tag + "###################################" 

def callback(self, data): 
    #self.channel.basic_ack(data, multiple=True) 
    pass 

答えて

1

on_connectedコールバック内の接続で 'on-close'ハンドラを登録することができます。これは、接続が失われたときに呼び出されます。ここで、新しい接続を再確立することができます。

次の例では、比較的便利であり、それは私が良い効果に使用されてきた戦略だ...(私がテストしていないが)add_on_close_callback方法は、おそらくかなり遠くあなたを取得しますツイストナキウサギライブラリの http://pika.readthedocs.io/en/latest/examples/asynchronous_consumer_example.html

https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html

+0

は、どのように私はナキウサギライブラリのねじれた実装は、接続クローズイベントのリスナを登録するために利用できる同様の方法を持っているようだツイストプロトコル – Johnathon64

+0

を使用して、これをやって行くだろう。あなたは 'add_on_close_callback'メソッドを使ってみることができます。 [link] https://pika.readthedocs.io/en/0.10.0/modules/adapters/twisted.html – nkuro

+0

を参照してください。私はこれを読んだことがありますが、私のソリューションでこれをどのように実装するのかは分かりません – Johnathon64

0

接続を閉じてもう一度開くことができない理由はありますか?

@contextmanager 
def with_pika_connection(): 
    credentials = pika.PlainCredentials(worker_config.username, worker_config.password) 
    connection = pika.BlockingConnection(pika.ConnectionParameters(
     host=worker_config.host, 
     credentials=credentials, 
     port=worker_config.port, 
    )) 

    try: 
     yield connection 
    finally: 
     connection.close() 


@contextmanager 
def with_pika_channel(queuename): 
    with with_pika_connection() as connection: 
     channel = connection.channel() 


while True: 
    while not stopping: 
     try: 
       with with_pika_channel(queuename) as (connection, channel): 
        consumer_tag = channel.basic_consume(
         callback, 
         queue=queuename, 
        ) 
        channel.start_consuming() 
     except Exception as e: 
       reportException(e) 
       # Continue 
関連する問題