2016-09-07 4 views
0

私はウサギのキューにいくつかのデータが頻繁に埋め込まれていることを前提としています(例えば、ユーザーが後で分析する必要があるアクション)。 30秒から50秒まで、毎秒新しい項目が追加されます。 私が必要とするのは、このキューを見渡し、そのデータに対していくつかのタスクを実行するワーカーを作成することです。私は次のようにすることができます:php rabbit workerを作成する最善の方法

class Worker 
{ 

    public function run() 
    { 
     $queue = new Queue('exchange', 'queue'); 
     while (true) 
     { 
      $queue->processQueue(); 
     } 
    } 
} 

さらに、サーバー上でworker.phpを実行するだけで、動作しているようです。

しかし、この無限ループが私のウサギのインスタンスに余分な負荷を加えてしまうのではないかと思います。たぶん、より良い方法は、だから私の労働者がしばらくの間、継続的にウサギが、スリープからデータを取得しません

class Worker 
{ 
    CONST IDLE = 5; 

    private $start = 0; 

    public function run() 
    { 
     $this->start = time(); 

     $queue = new Queue('exchange', 'queue'); 
     while (true) 
     { 
      $queue->processQueue(); 

      //don't allow this worker to be working a lot 
      if (time() - $this->start >= 60 * 60 - self::IDLE) 
      { 
       break; 
      } 

      sleep(self::IDLE); 
     } 

     $queue->close(); 
    } 
} 

のようになめらかに行うことです。そして1時間の仕事の後、それはただ仕事を止めるでしょうし、別の人のインスタンスはcrontabの仕事か他の仕事によって呼び出されますか?私は次のライブラリを使用するのRabbitMQと私の労働者を管理するために

答えて

2

https://github.com/php-amqplib/php-amqplib

それから私は私の労働者が作品(すべてのRabbitMQのロジックが含まれている)、それは私を与える必要がありますどのように定義したクラスを作成しますそのようなものは:

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 

abstract class QueueAMQPConsumer 
{  
    protected $connection; 

    protected $debug; 

    protected $queueName; 

    protected $exchange; 

    public function __construct(AMQPStreamConnection $AMQPConnection, $queueName, $exchange = null) 
    { 
     $this->connection = $AMQPConnection; 
     $this->queueName = $queueName; 
     $this->exchange = $exchange; 
    } 

    public function run($debug = false) 
    { 
     $this->debug = $debug; 
     $channel = $this->connection->channel(); 
     if ($this->exchange !== null) { 
      $channel->exchange_declare($this->exchange, "topic", false, true, false); 
     } 

     $channel->queue_declare($this->queueName, false, true, false, false); 
     if ($this->exchange !== null) { 
      $channel->queue_bind($this->queueName, $this->exchange); 
     } 

     $channel->basic_qos(null, 1, null); 
     $channel->basic_consume($this->queueName, '', false, false, false, false, [$this, 'callback']); 

     while (count($channel->callbacks)) { 
      $channel->wait(); 
     } 

     $channel->close(); 
     $this->connection->close(); 
    } 


    final function callback(AMQPMessage $message) 
    { 
     $result = $this->process($message); 

     if (false === $result) { 
      $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, true); 
     } else { 
      $message->delivery_info['channel']->basic_ack($message-> delivery_info['delivery_tag']); 
     } 
    } 

    /** 
    * @param AMQPMessage $message 
    * 
    * @return bool 
    */ 
    abstract protected function process(AMQPMessage $message); 
} 

このクラスは、セットアップにキュー、交換機(この場合はトピック)、QoSを可能にする(することができますカスタムこれらすべてのパラメータ、そのわずか一例)など。

次に、コールバックをループします。ここでコールバックは、キューを処理する必要がある異なるワーカーに実装される抽象メソッドのプロセス(...)です。 $channel->wait();

それから私は、キュー内のメッセージを処理する必要が子クラスを作成します。:

class MyWorker extends QueueAMQPConsumer 
{ 
    protected function process(AMQPMessage $message) 
    { 
     // .... process your message here 
    } 
} 

だから労働者はあなたを聞いてされますので、のresponsabilityチャンネルである「ループ/リスニング」常にキューに入れ、キューに到着した時点でメッセージを処理します。 process(...)がfalse以外を返すと、メッセージが確認されます。

あなたはちょうどそのようなあなたのクラスを起動する必要があります。

$consumer = new MyWorker(....);  
$consumer->run(); 
関連する問題