RabbitMqBundleでSymfony2を使用して、ElasticSearchにドキュメントを送信するワーカーを作成しました。 ElasticSearch一括APIを使用するよりも、1つずつの割合でドキュメントのインデックスを作成する方がはるかに遅くなります。したがって、私は1000個のグループで文書をESにフラッシュするバッファを作成しました。コードルックス(ビット簡体字)としては、以下:PHP CLIスクリプトの非アクティブ期間後の実行機能
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
}
}
このすべてはかなり良い動作しますが、若干の問題があります。キューは予測不能な速度でメッセージで満たされます。 5分で100000になることもありますが、時には1時間に1回であることもあります。たとえば、82671のドキュメントがキューに入れられた場合、最後の671ドキュメントは、別の329ドキュメントを受信する前に索引付けされず、時間がかかることがあります。次のことができます:
警告:Sci-Fiコード!これは明らかに動作しません:今
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
を、私は、イベントが駆動されていないPHPの限界を知っています。しかし、これは2015年であり、ReactPHPのような解決策があるので、これは可能でしょうか? ØMQにはthis functionがあります。 RabbitMQやメッセージキューエクステンションとは独立して動作するソリューションは何でしょうか?私は懐疑的だ
ソリューション:は
- crysalead/codeがあります。それは
declare(ticks = 1);
を使用してタイマーをシミュレートします。私はこれが実践的かつ堅実なアプローチであるかどうかはわかりません。何か案は? - 5分ごとに 'FLUSH'メッセージを同じキューに発行し、このメッセージを受け取ったときに明示的にバッファをフラッシュするが、それは不正行為になるcronジョブを実行できます。
あなたが探しているものは完全ではありませんが、解決策の間には良いかもしれません。 'flush'コマンドを最後に実行した時刻を保存し、ドキュメントを追加するときにも時刻を確認します。とにかく5分以上過ぎていたら。 2番目に良いオプションはcronjob IMHO –
です。長い間何もメッセージが届かない場合は、時間をチェックすることができないため、バッファがフラッシュされません。 cronjobはPHPを別のプロセスで実行するため、バッファにアクセスできません。 – Xatoo
長時間実行されているPHPプロセスでこのコードを実行していますか?この場合、おそらく信号を使用することができます(数字の1つのオプションと同じように)見た目は[こちら]です(http://www.hackingwithphp.com/16/1/1/taking-control-of-php-pcntl_signal )と[ここ](http://www.hackingwithphp。com/16/1/2/timing-your-signals)を使用します。これらのシグナルはノンブロッキングであり、まだそれ自身を使用していませんが、あなたのユースケースに必要なものかもしれません。 –