2017-04-12 6 views
0

RabbitMQとpika(python)を使用して、ノード(非同期コンシューマ)にタスクを提供するジョブキューイングシステムを実行しています。タスクを定義する各メッセージは、そのタスクが完了するとただちに承認されます。RabbitMQ pika非同期消費者のハートビートの問題

これらのノードで更新を実行する必要があり、ノードがタスクの完了を待ってから正常に終了する終了モードを作成しました。私は私のメンテナンス作業を行うことができます。

この終了モードでノードがRabbitMQからメッセージをさらに受け取らないように、私はジョブが終了するのを待つ前にbasic_cancelメソッドを呼び出します。

この方法のこの効果は、ナキウサギのマニュアルに記載されて:あなたが読んであれば

This method cancels a consumer. This does not affect already 
delivered messages, but it does mean the server will not send any more 
messages for that consumer. The client may receive an arbitrary number 
of messages in between sending the cancel method and receiving the 
cancel-ok reply. It may also be sent from the server to the client in 
the event of the consumer being unexpectedly cancelled (i.e. cancelled 
for any reason other than the server receiving the corresponding 
basic.cancel from the client). This allows clients to be notified of 
the loss of consumers due to events such as queue deletion. 

メッセージを既に受け取ったが、必ずしも認められないとして、「すでにメッセージを配信」、退出モードが待機することを可能にするタスクそれを実行するコンシューマノードがキューイングシステムから自身を取り消しても、キューを再キューに入れてはいけません。

(ナキウサギの例から取った)私の非同期消費者クラスの停止機能のための私のコードはこの1つのようになります。

def stop(self): 
    """Cleanly shutdown the connection to RabbitMQ by stopping the consumer 
    with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok 
    will be invoked by pika, which will then closing the channel and 
    connection. The IOLoop is started again because this method is invoked 
    when CTRL-C is pressed raising a KeyboardInterrupt exception. This 
    exception stops the IOLoop which needs to be running for pika to 
    communicate with RabbitMQ. All of the commands issued prior to starting 
    the IOLoop will be buffered but not processed. 

    """ 
    LOGGER.info('Stopping') 
    self._closing = True 
    self.stop_consuming() 
    LOGGER.info('Waiting for all running jobs to complete') 
    for index, thread in enumerate(self.threads): 
     if thread.is_alive(): 
      thread.join() 
      # also tried with a while loop that waits 10s as long as the 
      # thread is still alive 
      LOGGER.info('Thread {} has finished'.format(index)) 

    # also tried moving the call to stop consuming up to this point 
    if self._connection!=None: 
     self._connection.ioloop.start() 
     LOGGER.info('Closing connection') 
     self.close_connection() 

私の問題は、消費者の解除後、非同期コンシューマがないように見えることです私の仕事(スレッド)が終了するのを待っているループの後でキャンセルを実行しても、もうハートビートを送信する必要があります。

BlockingConnectionsのprocess_data_events関数について読んだことがありますが、そのような関数は見つかりませんでした。 SelectConnectionクラスのioloopはasyncコンシューマと同等ですか?

終了モードのノードがもうハートビートを送信しないため、現在実行中のタスクは最大ハートビートに達するとRabbitMQによって再キューされます。私はこの心拍をそのままにしておきたいと思います。なぜなら、私が出口モードにいないときには問題ではないからです(私の心拍は約100秒で、私の仕事は完了するのに2時間ほどかかるかもしれません)。

=ERROR REPORT==== 12-Apr-2017::19:24:23 === 
closing AMQP connection (.....) : 
missed heartbeats from client, timeout: 100s 

私は考えることができる唯一の回避策はまだと、終了モードで実行中のタスクに対応するメッセージを認めており、これらのことを期待:RabbitMQのログを見ると

、ハートビートは確かな理由ですタスクは失敗しません...

待機中に手動でハートビートを送信するために使用できるチャネルまたは接続の方法はありますか?

(pythonスレッドパッケージからの)time.sleep()またはthread.join()メソッドが完全にブロックされ、他のスレッドが必要とするものを実行できないという問題がありますか?私は他のアプリケーションで使用しており、そのように動作するようには見えません。

この問題は、終了モードでのみ表示されるので、コンシューマにハートビートの送信を停止させる停止機能があると思いますが、(成功なしで)try_consumingメソッドを呼び出した後でwait-on-running-tasksのループ、私はこの問題の根源になるものは見当たりません。

ありがとうございました!

答えて

0

これは、channel_close()関数のコールバックで非同期的にbasic_cancelを呼び出していたため、私のアプリケーションでRabbitMQのやりとりを停止し、RabbitMQがunackesdmessagesを再キューイングしていました。実際には、チャンネルがNoneに設定されているので、残りのタスクを後で認識しようとしているスレッドがエラーを起こしていたため、もうackメソッドを持たなかったことに気付きました。

誰かを助けることを願っています!

関連する問題