2016-11-05 7 views
1

SymfonyとRabbitMQバンドルをアプリケーション用に使用していて、次の問題が発生しました:コンシューマサービスがキャッチされていない例外/エラー(メモリ不足など)リジェクト信号またはACK信号が得られるまで何度も消費されます。この動作を変更して、メッセージが最初に消費されたときにキャッチされない例外/エラーが発生した場合にメッセージが破棄されるようにします。エラーが発生して処理が終了すると、RabbitMQの拒否メッセージが返される

これは可能ですか?ありがとう!

答えて

1

はい、メッセージをACKする必要があります。これを行うには、auto-ackフラグをtrueに設定するか(使用している言語/ API /ライブラリによって異なる)、メッセージを手動または明示的に確認します。 message is republished and consumed again and againと言ったように、処理できないメッセージを確認するのは完全に正常です。

場合もあります。set requeue to false。私はRabbitMQを扱うためにPHPを使用しないので、APIの同等性はわかりません。つまり、nackが実装されています - その場合(そうではありません)、それは良いアイデアかもしれませんdead letter exchange(リンクからの引用)を構成してください。

キューからのメッセージは「デッドレター」にすることができます。つまり、以下のいずれかのイベントが発生したとき に別の交換を再発行:

のメッセージは... =偽 REQUEUE、
と(basic.reject又はbasic.nack)を拒否され

0

あなたの問題を解決すべき下記の例を見てください。この場合:

  • コンシューマーは何も起こっていないかのように動作します。
  • メッセージはキューから破棄されます。

ソリューション

class OrderCreateConsumer implements ConsumerInterface 
{ 
    public function execute(AMQPMessage $message) 
    { 
     $body = json_decode($message->body, true); 

     try { 
      // Do whatever you want with $body 
     } catch (Exception $e) { 
      return ConsumerInterface::MSG_REJECT; 
     } 
    } 
} 

またはフルsymfonyの+のRabbitMQ例の詳細はここにある:The proper ways of handling errors in symfony RabbitMQ consumer。オプション2,3,4があなたに当てはまるように見えます。これはあなたの質問の音によって避けようとしているものです。

+0

ありがとう、BentCoder!私はConsumerInterface定数に精通していますが、 "メモリ不足"や "最大実行時間を超過しました"などの\ Throwableであっても、捕まえられないエラーを処理することが問題です。このような場合、メッセージは再公開されます。それはどのように変えることができますか?そのようなオプションが存在する場合、PHPスクリプトが停止しているので、RabbitMQの設定に属していなければなりません...私はauto-ackフラグについて読んでいます。 RabbitMQバンドルでこれを行う方法はありませんでした。 – medowlock

+0

symfony DLEのこのレポを見ましたか? [無限ループの問題](https://github.com/cmodijk/RabbitMQBundleBridge/blob/patch-3/doc/dead_letter_exchange.md) – BentCoder

+0

@medowlock私の答えを確認しましたか、*設定の再入力オプションはfalse *部分ですか? – cantSleepNow

0

これまでのもっとも近い解決策は、これまでRabbitMqバンドルを拡張することでした。 basic_consume方法の第四引数は$no_ackと呼ばれ、falseに設定されている

protected function setupConsumer() 
{ 
    if ($this->autoSetupFabric) { 
     $this->setupFabric(); 
    } 
    $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage')); 
} 

BaseConsumerクラス(名前空間OldSound\RabbitMqBundle\RabbitMq)では、このようになりますsetupConsumerと呼ばれる方法があります。このパラメータをtrueに設定すると、エラーが発生したか、例外がスローされたか、またはすべて正常に処理されたかにかかわらず、メッセージは処理された後に破棄されます。したがって、メッセージはいずれかの方法で破棄されます。メッセージはまだ破棄されます - 覚えておいてください

はとても効果がありませんでしょうConsumerInterface::MSG_REJECT_REQUEUEを返し、$no_ackパラメータがtrueに設定されている場合、それはどのような状況のような消費者を返す関係ないと思いました。

0

AMQPプロトコルは、メッセージredelivered propertyを提供します。

私はRabbitMQBundleでそれを行う方法がわかりませんが(私は完全に可能であると確信しています)。私はエンキューLIBでそれを行う方法をお見せすることができますが:

<?php 

use Enqueue\Psr\PsrContext; 
use Enqueue\Psr\PsrMessage; 
use Enqueue\Psr\PsrProcessor; 

class FooProcessor implements PsrProcessor 
{ 
    public function process(PsrMessage $message, PsrContext $context) 
    { 
     if ($message->isRedelivered()) { 
      // we already tried to process this message and failed. 

      return self::REJECT; 
     } 

     // this is the new message we've never seen before. 

     // do the job 

     return self::ACK; 
    } 
} 

このような問題を解決する別の方法は、再配信メッセージを遅延させることで、再びそれはRabbitMQBundleで行うことは可能ですが、手動ですべての部分を設定する必要があると思いますエンキューバンドルはそれを行います。遅延プラグインをセットアップしてconfigオプションをオンにするだけです。 post

キャッチは常に信頼できるものではありません。致命的なエラーなど、取得できないエラーが発生する可能性があります。

通知や警告や回復機能なしに、失敗したメッセージが失われる可能性があるため、自動応答モードはあまり良くありません。

関連する問題