2017-10-03 3 views
0

私はRabbitMQのコンシューマとしてナキウサギねじれた接続を使用して、ここに私のコードです:消費者をピカのためのねじれた接続で再接続するには?

@defer.inlineCallbacks 
def run(connection): 
    queue_name = 'aaa' 
    channel = yield connection.channel() 
    queue = yield channel.queue_declare(queue=queue_name, auto_delete=False, exclusive=False) 
    yield channel.queue_bind(exchange='amq.direct',queue=queue_name,routing_key=queue_name) 
    yield channel.basic_qos(prefetch_count=1) 
    queue_object, consumer_tag = yield channel.basic_consume(queue=queue_name,no_ack=False) 
    logger.info('[room server]start consume queue %s', queue_name) 

    l = task.LoopingCall(read, queue_object) 
    l.start(0.1) 


@defer.inlineCallbacks 
def read(queue_object): 
    ch,method,properties,body = yield queue_object.get() 
    try: 
     data = json.loads(body) 
     head_code = data['head_code'] 
     openid = data['openid'] 
     message_content = data['message_content'] 
     conn_id = -1 
     try: 
      conn_id = data['conn_id'] 
     except: 
      pass 
     message_dispatcher(head_code, openid, message_content, conn_id) 
     yield ch.basic_ack(delivery_tag=method.delivery_tag) 
    except ValueError as e: 
     logger.error('[error!]error body %s' % body) 
     yield ch.basic_ack(delivery_tag=method.delivery_tag) 

credentials = pika.PlainCredentials(config.RABBITMQ_USERNAME, config.RABBITMQ_PASSWD) 
parameters = pika.ConnectionParameters(credentials=credentials) 
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) 

def got_error(failure, d): 
    logger.error(failure) 
    d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT) 


def start(): 
    d = cc.connectTCP(config.RABBITMQ_HOST, config.RABBITMQ_PORT) 
    d.addCallback(lambda protocol: protocol.ready) 
    d.addCallback(run) 
    d.addErrback(got_error, d) 

私の問題は、とき接続が切断、再接続プロセスが動作しないです。enter image description here

にする方法再接続作業ですか?

答えて

0

TwistedProtocolConnection docstringによれば、接続終了を処理するためにon_close_callback機能を提供することができる。この関数では、reason_code and reason_textはargsでなければなりません。ですから、接続端子を扱う別のon_closeコールバックを作成し、それが起こった理由は、その後のRabbitMQに接続するために必要なロジックを実行します。あなたは、あなたのClientCreatorコードを例に従ってください、代わりにそのコードを持っていたら

def connection_termination(reason_code, reason_text): 
    """ 
    Log the reasons why the connection terminates and then reconnect 
    """ 
    # put your connection code here 
    # incrementally space out your reconnections, eg. 2 seconds, if fail, 5 seconds, if fail 10 seconds, etc... 

以下:

cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters, connection_termination) 

残念ながら、私は現時点でこれをテストすることはできませんが、うまくいくはずです。あなたはすでにロジックの大部分を持っているので、私はエクササイズのために残しておきます;)問題がある場合はコメントしてください。解決策がある場合は、他者の最終結果を示してください。

+0

私はあなたのコードを試してみましたが、例外が発生:スタートで D = cc.connectTCP(config.RABBITMQ_HOST、config.RABBITMQ_PORT) ファイル "/Library/Python/2.7/site-packages/twisted/internet/protocol.py" 、行292、connectTCP bindAddress = bindAddress) ファイル "/Library/Python/2.7/site-packages/twisted/internet/protocol.py"、行274、_connect self.reactor、self.protocolClass(* self .args、** self.kwargs)、d) TypeError:__init __()は正確に2つの引数をとります(3つは指定されています) –

関連する問題