SymfonyとRabbitMQバンドルをアプリケーション用に使用していて、次の問題が発生しました:コンシューマサービスがキャッチされていない例外/エラー(メモリ不足など)リジェクト信号またはACK信号が得られるまで何度も消費されます。この動作を変更して、メッセージが最初に消費されたときにキャッチされない例外/エラーが発生した場合にメッセージが破棄されるようにします。エラーが発生して処理が終了すると、RabbitMQの拒否メッセージが返される
これは可能ですか?ありがとう!
SymfonyとRabbitMQバンドルをアプリケーション用に使用していて、次の問題が発生しました:コンシューマサービスがキャッチされていない例外/エラー(メモリ不足など)リジェクト信号またはACK信号が得られるまで何度も消費されます。この動作を変更して、メッセージが最初に消費されたときにキャッチされない例外/エラーが発生した場合にメッセージが破棄されるようにします。エラーが発生して処理が終了すると、RabbitMQの拒否メッセージが返される
これは可能ですか?ありがとう!
はい、メッセージをACKする必要があります。これを行うには、auto-ackフラグをtrueに設定するか(使用している言語/ API /ライブラリによって異なる)、メッセージを手動または明示的に確認します。 message is republished and consumed again and again
と言ったように、処理できないメッセージを確認するのは完全に正常です。
場合もあります。set requeue
to false
。私はRabbitMQを扱うためにPHPを使用しないので、APIの同等性はわかりません。つまり、nackが実装されています - その場合(そうではありません)、それは良いアイデアかもしれませんdead letter exchange(リンクからの引用)を構成してください。
キューからのメッセージは「デッドレター」にすることができます。つまり、以下のいずれかのイベントが発生したとき に別の交換を再発行:
のメッセージは... =偽 REQUEUE、
と(basic.reject又はbasic.nack)を拒否され
あなたの問題を解決すべき下記の例を見てください。この場合:
ソリューション
class OrderCreateConsumer implements ConsumerInterface
{
public function execute(AMQPMessage $message)
{
$body = json_decode($message->body, true);
try {
// Do whatever you want with $body
} catch (Exception $e) {
return ConsumerInterface::MSG_REJECT;
}
}
}
またはフルsymfonyの+のRabbitMQ例の詳細はここにある:The proper ways of handling errors in symfony RabbitMQ consumer。オプション2,3,4があなたに当てはまるように見えます。これはあなたの質問の音によって避けようとしているものです。
これまでのもっとも近い解決策は、これまでRabbitMqバンドルを拡張することでした。 basic_consume
方法の第四引数は$no_ack
と呼ばれ、false
に設定されている
protected function setupConsumer()
{
if ($this->autoSetupFabric) {
$this->setupFabric();
}
$this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
}
:BaseConsumer
クラス(名前空間OldSound\RabbitMqBundle\RabbitMq
)では、このようになりますsetupConsumer
と呼ばれる方法があります。このパラメータをtrue
に設定すると、エラーが発生したか、例外がスローされたか、またはすべて正常に処理されたかにかかわらず、メッセージは処理された後に破棄されます。したがって、メッセージはいずれかの方法で破棄されます。メッセージはまだ破棄されます - 覚えておいてください
はとても効果がありませんでしょうConsumerInterface::MSG_REJECT_REQUEUE
を返し、$no_ack
パラメータがtrue
に設定されている場合、それはどのような状況のような消費者を返す関係ないと思いました。
AMQPプロトコルは、メッセージredelivered propertyを提供します。
私はRabbitMQBundleでそれを行う方法がわかりませんが(私は完全に可能であると確信しています)。私はエンキューLIBでそれを行う方法をお見せすることができますが:
<?php
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
class FooProcessor implements PsrProcessor
{
public function process(PsrMessage $message, PsrContext $context)
{
if ($message->isRedelivered()) {
// we already tried to process this message and failed.
return self::REJECT;
}
// this is the new message we've never seen before.
// do the job
return self::ACK;
}
}
このような問題を解決する別の方法は、再配信メッセージを遅延させることで、再びそれはRabbitMQBundleで行うことは可能ですが、手動ですべての部分を設定する必要があると思いますエンキューバンドルはそれを行います。遅延プラグインをセットアップしてconfigオプションをオンにするだけです。 post
キャッチは常に信頼できるものではありません。致命的なエラーなど、取得できないエラーが発生する可能性があります。
通知や警告や回復機能なしに、失敗したメッセージが失われる可能性があるため、自動応答モードはあまり良くありません。
ありがとう、BentCoder!私はConsumerInterface定数に精通していますが、 "メモリ不足"や "最大実行時間を超過しました"などの\ Throwableであっても、捕まえられないエラーを処理することが問題です。このような場合、メッセージは再公開されます。それはどのように変えることができますか?そのようなオプションが存在する場合、PHPスクリプトが停止しているので、RabbitMQの設定に属していなければなりません...私はauto-ackフラグについて読んでいます。 RabbitMQバンドルでこれを行う方法はありませんでした。 – medowlock
symfony DLEのこのレポを見ましたか? [無限ループの問題](https://github.com/cmodijk/RabbitMQBundleBridge/blob/patch-3/doc/dead_letter_exchange.md) – BentCoder
@medowlock私の答えを確認しましたか、*設定の再入力オプションはfalse *部分ですか? – cantSleepNow