2016-10-04 2 views
0

のキューを宣言中にReturnCallbackがスタックしました。メッセージが再度ドロップされないように、ルートが使用できない場合は、パブリッシャがコールバックを使用してキューとバインディングを宣言しようとしています。 私のキューは自動的に削除され、消費者がダウンすると削除されます。rabbitmq NO_ROUTE(312)

しかし、returnCallbackスレッドは、admin.declareQueue(queue)のreturnedMessage()にスタックされます。私はそれがRabbitAdmin.declareQueue(に貼り付けられていることを確認さらにデバッグを

)において: DeclareOk []宣言= declareQueues(チャネル、キュー)。

この呼び出しはスタックされていますが、(コンソールからチェックされた)キューが宣言されています。また、を送信しても、最初にreturnedMessageコールが返されていない可能性があるので、呼び出しはreturnedMessageを呼び出しません。

ここで何か間違っていますか?リターンコールバックでキュー/バインディングを宣言するのは正しいですか?

本当にありがとうございます。ありがとう。以下は

私ReturnCallbackです:

public class MyReturnCallback implements ReturnCallback { 
    // constructor, member initialization goes here 

    @Override 
    public void returnedMessage(Message message, int replyCode, 
      String replyText, String exchangeName, String routingKey) { 
     if (replyCode == 312) { 
      if (this.exchangeName.equals(exchangeName) && this.routingKey.equals(routingKey)) { 
       RabbitAdmin admin = new RabbitAdmin(connectionFactory); 
       Exchange exchange = new DirectExchange(exchangeName, true, false); 
       Queue queue = new Queue(queueName, true, false, true); 
       admin.declareQueue(queue); 
       Binding binding = BindingBuilder.bind(queue).to((DirectExchange)exchange).with(routingKey); 
       admin.declareBinding(binding); 
       if (null != binding) { 
        RabbitTemplate rabbitmqTemplate = new RabbitTemplate(connectionFactory); 
        logger.debug("Sending to [exchange:" + exchange.getName() + ", routing-key:" + routingKey + "]:" + message.toString()); 
        rabbitmqTemplate.send(exchangeName, routingKey, message); 
       } 
      } 
     } 
    } 
} 

そして、私のテスト・プロデューサーのようなものです:

public class TestProducer { 
    // constructor, member initialization goes here 

    void initialize() 
     rabbitAdmin = new RabbitAdmin(connectionFactory); 
     rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setExchange(exchangeName); 
     rabbitTemplate.setMessageConverter(messageConverter); 
     rabbitTemplate.setRoutingKey(routingKey); 
     rabbitTemplate.setMandatory(true); 
     rabbitTemplate.setReturnCallback(new RabbitReturnCallback()); 

     Exchange exchange = new DirectExchange(exchangeName, true, false); 
     rabbitAdmin.declareExchange(exchange); 
     Queue queue = new Queue(queueName, true, false, true); 
     rabbitAdmin.declareQueue(queue); 
     Binding binding = BindingBuilder.bind(queue).to((DirectExchange)exchange).with(routingKey); 
     rabbitAdmin.declareBinding(binding); 
    } 

    void send() { 
     rabbitTemplate.convertAndSend(message); 
    } 
} 

のConnectionFactory Beanです:

参照のデバッグログを取り付け
<rabbit:connection-factory id="rabbitmqConnectionFactory" 
    host="${rabbitmq.host:localhost}" 
    port="${rabbitmq.port:5672}" 
    username="${rabbitmq.username}" 
    password="${rabbitmq.password}" 
    virtual-host="${rabbitmq.vhost:/}" 
    cache-mode="CHANNEL" 
    channel-cache-size="${rabbitmq.channel-cache-size:25}" 
    publisher-returns="true"/> 

2016-10-04 17:58:58,881 [main] INFO CachingConnectionFactory:291 - Created new connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:58,883 [main] DEBUG RabbitAdmin:399 - Initializing declarations 
2016-10-04 17:58:58,883 [main] DEBUG DefaultListableBeanFactory:250 - Returning cached instance of singleton bean 'org.springframework.context.annotation.ConfigurationClassPostProcessor.importRegistry' 
2016-10-04 17:58:58,935 [main] DEBUG CachingConnectionFactory:453 - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1) 
2016-10-04 17:58:58,975 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener [email protected] 
2016-10-04 17:58:58,976 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] to map, size now 1 
2016-10-04 17:58:58,976 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:58,979 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1) 
2016-10-04 17:58:58,979 [main] DEBUG RabbitAdmin:460 - Declarations finished 
2016-10-04 17:58:58,980 [main] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] retrieved from cache 
2016-10-04 17:58:58,981 [main] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:58,981 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener [email protected] 
2016-10-04 17:58:58,982 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] to map, size now 1 
2016-10-04 17:58:58,982 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:58,982 [main] DEBUG RabbitAdmin:487 - declaring Exchange 'myexchange' 
2016-10-04 17:58:59,006 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1) 
2016-10-04 17:58:59,007 [main] INFO TestProducer:59 - Declared/Declare-confirmed for direct exchange: myexchange 
2016-10-04 17:58:59,009 [main] DEBUG TestProducer:74 - Sending to [exchange:myexchange, routing-key:mykey]:[testpayload] 
2016-10-04 17:58:59,090 [main] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] retrieved from cache 
2016-10-04 17:58:59,091 [main] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:59,091 [main] DEBUG PublisherCallbackChannelImpl:694 - Added listener [email protected] 
2016-10-04 17:58:59,091 [main] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] to map, size now 1 
2016-10-04 17:58:59,091 [main] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:59,098 [main] DEBUG RabbitTemplate:1325 - Publishing message on exchange [myexchange], routingKey = [mykey] 
2016-10-04 17:58:59,104 [main] TRACE CachingConnectionFactory:906 - Returning cached Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1) 
2016-10-04 17:58:59,134 [AMQP Connection ip-address:5672] DEBUG TestProducer:103 - returnedMessage, replyCode: 312, replyText: NO_ROUTE 
2016-10-04 17:58:59,135 [AMQP Connection ip-address:5672] TRACE CachingConnectionFactory:402 - Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] retrieved from cache 
2016-10-04 17:58:59,136 [AMQP Connection ip-address:5672] TRACE CachingConnectionFactory:439 - Found cached Rabbit Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG PublisherCallbackChannelImpl:694 - Added listener [email protected] 
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG RabbitTemplate:1451 - Added pending confirms for Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] to map, size now 1 
2016-10-04 17:58:59,163 [AMQP Connection ip-address:5672] DEBUG RabbitTemplate:1296 - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://[email protected]ress:5672/,1), conn: [email protected] Shared Rabbit Connection: [email protected] [delegate=amqp://[email protected]:5672/] 
2016-10-04 17:58:59,164 [AMQP Connection ip-address:5672] DEBUG RabbitAdmin:515 - declaring Queue 'myqueue' 
2016-10-04 17:59:29,104 [main] DEBUG TestProducer:74 - Sending to [exchange:myexchange, routing-key:mykey]:[testpayload] 

Nothing happens after this. 
+0

興味深いことに、私はそれを再現しました...調査中... –

答えて

0

興味深い問題 - 最初の送信後にチャネルがキャッシュに戻されるため、そのチャネルでリターンが配信されると、コールバック内の送信で同じチャネルが取得され、チャネル内でデッドロックが発生します。私たちはまだリターンを処理している間に宣言します。

私たちが復帰するかどうかわからないので、復帰するまでキャッシュをキャッシュに戻すことを本当に遅らせることはできません。

私は、この状態を検出して回避する方法があるかどうかについていくつか考えますが、その間に、宣言を処理して別のスレッドに再発行するのが最も安全な方法です。 ..

private final Executor executor = Executors.newCachedThreadPool(); 

@Override 
public void returnedMessage(Message message, int replyCode, String replyText, String exchangeName, 
     String routingKey) { 
    if (replyCode == 312) { 
     executor.execute(() -> { 
      RabbitAdmin admin = new RabbitAdmin(connectionFactory); 
      Exchange exchange = new DirectExchange(exchangeName, true, false); 
      Queue queue = new Queue("foo", true, false, true); 
      admin.declareQueue(queue); 
      Binding binding = BindingBuilder.bind(queue).to((DirectExchange) exchange).with(routingKey); 
      admin.declareBinding(binding); 
      if (null != binding) { 
       RabbitTemplate rabbitmqTemplate = new RabbitTemplate(connectionFactory); 
       System.out.println("Sending to [exchange:" + exchange.getName() + ", routing-key:" + routingKey 
         + "]:" + message.toString()); 
       rabbitmqTemplate.send(exchangeName, routingKey, message); 
      } 
     }); 
    } 
} 
+0

非常に迅速に応答するために@GaryRussellに感謝します。あなたが提案したアプローチを試してみます。ありがとう。 – Rashida

+0

提案したアプローチを試してみてください。私の問題を解決しました。ありがとう! – Rashida