2015-10-17 10 views
6

次のPHPアプリケーションがあります。これにより、ユーザーのサインアップがメッセージキューにパブリッシュされます。 Javaアプリケーションはそのキューから読み込み、それをインポートします。うまくいけば、以下のダイアグラムがそれを討論するでしょう。私は物事のJava側でのみ作業しています。 jsonメッセージはすでにキューに存在します。RabbitMQ - Apache Camel読んでいるメッセージ失敗したメッセージの処理方法

enter image description here

ルート(Javaはサイドを消費)。

@Component 
public class SignUpRouting { 

    errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage()); 

    from("rabbitmq://phpSignUp.exchange?username=etc....") 
      .routeId("signUpRoute") 
      .processRef("signUpProcessor") 
      .end(); 
    //.... 

プロセッサ..

@Component 
public class SignupProcessor implements Processor { 

    private ObjectMapper mapper = new ObjectMapper(); 

    @Override 
    public void process(Exchange exchange) throws Exception { 

     String json = exchange.getIn().getBody(String.class); 
     SignUpDto dto = mapper.readValue(json, SignUpDto.class); 

     SignUp signUp = new SignUp(); 
     signUp.setWhatever(dto.getWhatever()); 
     //etc.... 

     // save record 
     signUpDao.save(signUp); 
    } 
} 

私の質問は、この..です私は、プロセッサがメッセージをインポートするために失敗したときに私は何をすべき。

たとえば、DAO例外があったとします。データフィールドがツールインされているか、インポートが間違った形式であった可能性があります。私はメッセージを失いたくない。私はエラーを見て、インポートを再試行したいと思います。しかし、私は30秒ごとにメッセージを再試行し続けたくありません。

私は別のキューを作成する必要があると思っています。デッド・レター・キューで、6時間ごとにメッセージを無期限に再試行していますか?それから、ログにエラーが表示され、修正プログラムとメッセージ再処理されるだろうか?

どうすれば実装できますか?または私は間違ったトラックにいますか?

私は正しい方向に物事を得るでしょうかどうかを確認するためにdeadLetterExchangeを設定しようとしたEDIT ...しかし、エラーやキューはあなたがonExceptionを使用することができ

rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange 
+0

別のキューを使用している場合は、例外のスタックトレースとともに正確に失敗したメッセージを格納し、そのキューのデータを処理しないでください。 –

+0

わかりません。あなたは例を挙げることができますか? –

+0

サポートチームの贅沢をお持ちの場合は、別のキューにメッセージを送信するか、データベーステーブルに書き込んだり、サポートスタッフに電子メールを送信したりしてください。サポートスタッフがメッセージのテキストを変更してサインアッププロセッサに再投入できるようにする別のインターフェイスを作成します。あなたがしていることは、手動による介入を必要とすることです。それに従って設計する。彼がまれなイベントであるように、PHPアプリケーションの良い検証が必要です。 – Sammy

答えて

2

はデッド・レターヘッダーを使用する例です。

 <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp; 
      exchangeType=topic&amp; 
      routingKey=user.reg.*&amp; 
      deadLetterExchange=dead.msgs&amp; 
      deadLetterExchangeType=topic&amp; 
      deadLetterRoutingKey=dead.letters&amp; 
      deadLetterQueue=dead.letters&amp; 
      autoAck=false&amp; 
      autoDelete=false"/> 

      <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback--> 
     <onException useOriginalMessage="true"> 
      <exception>java.lang.Exception</exception> 
      <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/> 
     </onException> 

我々はAUTOACKをオフにする必要があるとdeadLetterQueueを設定し、スロー例外がある場合は、その後、メッセージは死んだになります文字待ち行列。 onExceptionを使用するには、メッセージをデッド・レター・キューにドロップする前に再試行を制御できます。

+0

質問のいくつか。なぜautoAckを有効にする必要がありますか? deadLetterExchangeType = topicとは何ですか? –

+0

autoAckがオンの場合、camelはメッセージを受信して​​いる間にbasic.ackを送信します。キャメルが免除時にbasic.refuseまたはbasic.nackを送ることはできないので、デッドレターの属性は役に立たないでしょう。 – sanigo

+0

deadRetterExchangeXXX属性は、デッド文字を指定されたExchangeおよびキューにルーティングするために使用されます。デモ設定ではトピックを使用しましたが、他の交換タイプを使用することもできます。 – sanigo

1

非nullにすることはできませんと言います例外がある場合、メッセージはデッドレター交換へのルートになります。ここではSpring DSLの例です:

<onException useOriginalMessage="true"> 
      <exception>java.sql.SQLException</exception> 
      <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="1" redeliveryDelay="1000"/> 

      <inOnly uri="rabbitmq://localhost/dead.msgs?exchangeType=fanout&amp; 
        autoDelete=false&amp; 
        bridgeEndpoint=true"/> 
</onException> 
0ここで
+0

これは良い答えであり、実行可能な解決策です+1。しかし、それはrabbitmq特有のデッド・ヘッダーと設定を使用しないように私に感じます。 http://camel.apache.org/rabbitmq.html –

+0

のように、なぜbridgeEndpoint = trueに設定する必要がありますか?それは何のためにあるのです? –

+0

属性を追加しないと、ターゲットキューはメッセージを受信しません。 – sanigo

関連する問題