2015-12-14 13 views
5

RabbitMqBundleでSymfony2を使用して、ElasticSearchにドキュメントを送信するワーカーを作成しました。 ElasticSearch一括APIを使用するよりも、1つずつの割合でドキュメントのインデックスを作成する方がはるかに遅くなります。したがって、私は1000個のグループで文書をESにフラッシュするバッファを作成しました。コードルックス(ビット簡体字)としては、以下:PHP CLIスクリプトの非アクティブ期間後の実行機能

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 
    } 
} 

このすべてはかなり良い動作しますが、若干の問題があります。キューは予測不能な速度でメッセージで満たされます。 5分で100000になることもありますが、時には1時間に1回であることもあります。たとえば、82671のドキュメントがキューに入れられた場合、最後の671ドキュメントは、別の329ドキュメントを受信する前に索引付けされず、時間がかかることがあります。次のことができます:

警告:Sci-Fiコード!これは明らかに動作しません:

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 
    protected $flushTimer; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 

     // Highly Sci-fi code 
     $this->flushTimer = new Timer(); 
     // Flush buffer after 5 minutes of inactivity. 
     $this->flushTimer->setTimeout(5 * 60); 
     $this->flushTimer->setCallback([$this, 'flush']); 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } else { 
      // Start a timer that will flush the buffer after a timeout. 
      $this->initTimer(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 

     // There are no longer messages to be send, stop the timer. 
     $this->flushTimer->stop(); 
    } 

    protected function initTimer() 
    { 
     // Start or restart timer 
     $this->flushTimer->isRunning() 
      ? $this->flushTimer->reset() 
      : $this->flushTimer->start(); 
    } 
} 

を、私は、イベントが駆動されていないPHPの限界を知っています。しかし、これは2015年であり、ReactPHPのような解決策があるので、これは可能でしょうか? ØMQにはthis functionがあります。 RabbitMQやメッセージキューエクステンションとは独立して動作するソリューションは何でしょうか?私は懐疑的だ

ソリューション:

  1. crysalead/codeがあります。それはdeclare(ticks = 1);を使用してタイマーをシミュレートします。私はこれが実践的かつ堅実なアプローチであるかどうかはわかりません。何か案は?
  2. 5分ごとに 'FLUSH'メッセージを同じキューに発行し、このメッセージを受け取ったときに明示的にバッファをフラッシュするが、それは不正行為になるcronジョブを実行できます。
+0

あなたが探しているものは完全ではありませんが、解決策の間には良いかもしれません。 'flush'コマンドを最後に実行した時刻を保存し、ドキュメントを追加するときにも時刻を確認します。とにかく5分以上過ぎていたら。 2番目に良いオプションはcronjob IMHO –

+0

です。長い間何もメッセージが届かない場合は、時間をチェックすることができないため、バッファがフラッシュされません。 cronjobはPHPを別のプロセスで実行するため、バッファにアクセスできません。 – Xatoo

+0

長時間実行されているPHPプロセスでこのコードを実行していますか?この場合、おそらく信号を使用することができます(数字の1つのオプションと同じように)見た目は[こちら]です(http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal )と[ここ](http://www.hackingwithphp。com/16/1/2/timing-your-signals)を使用します。これらのシグナルはノンブロッキングであり、まだそれ自身を使用していませんが、あなたのユースケースに必要なものかもしれません。 –

答えて

0

私のコメントで述べたように、信号を使うことができます。 PHPではシグナルハンドラをスクリプトシグナル(SIGINT、SIGKILLなど)に登録することができます。

SIGALRMシグナルを使用することができます。この信号は、(設定可能な)一定時間が経過した後にあなたのスクリプトを警告します。これらの信号の正の側は、それらが非ブロッキングであることである。言い換えれば、あなたのスクリプトの通常の操作は妨げられません。

調整液(ダニはPHP 5.3以降推奨されません):

function signal_handler($signal) { 
    // You would flush here 
    print "Caught SIGALRM\n"; 
    // Set the SIGALRM timer again or it won't trigger again 
    pcntl_alarm(300); 
} 

// register your handler with the SIGALRM signal 
pcntl_signal(SIGALRM, "signal_handler", true); 
// set the timeout for the SIGALRM signal to 300 seconds 
pcntl_alarm(300); 

// start loop and check for pending signals 
while(pcntl_signal_dispatch() && your_loop_condition) { 
    //Execute your code here 
} 

注:pcntl_alarmとあなたのためのタイマーをあなたの信号の時間を設定する場合にのみ、スクリプトで1つのSIGALRM信号を使用することができますアラームは、(信号を発射することなく)新たに設定された値にリセットされる。

+0

はい、私の質問で言及した '' crysalead/code''プロジェクトと同じです。しかし、これは "ticksを宣言する"を使用しており、各文が実行可能な解決策である場合にPHPの実行を中断するかどうかは疑いの余地があります。あなたはそれについて何か経験がありますか? – Xatoo

+0

また、提供したリンクは、ダニを使用することは推奨されていないことを説明するページへのリンクになります。多くの場合、ダニを使用すると、ほとんどの場合アンチパターンであることが、ダニの言及に記載されています。したがって、代替案があるかどうかに関心があります。 – Xatoo

+0

あなたの権利、私はダニが推奨されなかった部分を見落としました。私はいくつかの掘り出しを行い、廃止されていない代替品を見つけました。 'pcntl_signal_dispatch()'を使うことで、保留中のシグナルi.s.oをチェックするタイミングを決めることができます。 tickごとにハンドラを実行します。調整されたソリューションは、より多くのperformantになります..これが役立つことを願っています。 –

関連する問題