2016-11-25 11 views
0

私はSpring AMQPを持つAMQPプロジェクトを持っています。 RabbitMQ Serverは私のものではないので、私はそれを制御できません。Spring AMQPとShutdownSignalException

@Bean(name="myAnonymousResponseQueue") 
public Queue myAnonymousResponseQueue() 
{  
    Queue q = myAmqpAdmin().declareQueue(); 
    return q; 
} 

そして、私はこのようなSimpleMessageListenerContariner持っている:私のアプリケーションが起動すると、それは民間の応答キュー、次のようになります私は最近、接続の問題を持っていた

@Bean 
    public SimpleMessageListenerContainer myResponseMessageListenerContainer() 
    { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(myConnectionFactory()); 
     container.setQueues(myAnonymousResponseQueue()); 
     container.setMessageListener(myRabbitTemplate()); 
     container.setAcknowledgeMode(AcknowledgeMode.AUTO); 
     container.setMessageConverter(myMessageConverter()); 
     container.setErrorHandler(myResponseErrorHandler()); 
     container.setAutoStartup(true); 
     container.setRabbitAdmin(myAmqpAdmin()); 
     return container; 
    } 

(ShutdownSignalExceptionを)。問題は、プライベートキューを再生成できないことです。 まず、これは接続エラーです:

com.rabbitmq.client.ShutdownSignalException: connection error 
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:739) 
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:729) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.net.SocketException: Connection reset 
    at java.net.SocketInputStream.read(Unknown Source) 
    at java.net.SocketInputStream.read(Unknown Source) 
    at sun.security.ssl.InputRecord.readFully(Unknown Source) 
    at sun.security.ssl.InputRecord.read(Unknown Source) 
    at sun.security.ssl.SSLSocketImpl.readRecord(Unknown Source) 
    at sun.security.ssl.SSLSocketImpl.readDataRecord(Unknown Source) 
    at sun.security.ssl.AppInputStream.read(Unknown Source) 
    at java.io.BufferedInputStream.fill(Unknown Source) 
    at java.io.BufferedInputStream.read(Unknown Source) 
    at java.io.DataInputStream.readUnsignedByte(Unknown Source) 
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95) 
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:538) 
    ... 1 more 

しかし、接続が復元された際に、専用キューを再作成することができません。

AbstractConnectionFactory.java|291||Created new connection: [email protected] [delegate=amqp://[email protected]:50310/sob] 

RabbitAdmin.java|442||Auto-declaring a non-durable, auto-delete, or exclusive Queue (amq.gen-SLigrYFVMvllGTS5m_3AzQ) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-SLigrYFVMvllGTS5m_3AzQ' in vhost 'sob', class-id=50, method-id=10) 

BlockingQueueConsumer.java|565||Failed to declare queue:amq.gen-SLigrYFVMvllGTS5m_3AzQ 

BlockingQueueConsumer.java|479||Queue declaration failed; retries left=3 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[amq.gen-SLigrYFVMvllGTS5m_3AzQ] 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571) 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1171) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.io.IOException 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) 
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) 
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885) 
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61) 
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:383) 
    at sun.reflect.GeneratedMethodAccessor110.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
    at java.lang.reflect.Method.invoke(Unknown Source) 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:835) 
    at com.sun.proxy.$Proxy94.queueDeclarePassive(Unknown Source) 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550) 
    ... 3 more 
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-SLigrYFVMvllGTS5m_3AzQ' in vhost 'sob', class-id=50, method-id=10) 
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) 
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) 
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) 
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) 
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) 
    ... 12 more 
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-SLigrYFVMvllGTS5m_3AzQ' in vhost 'sob', class-id=50, method-id=10) 
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) 
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) 
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) 
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) 
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:556) 
    ... 1 more 

そして最後に、私はこれがあります。

com.rabbitmq.client.ShutdownSignalException: clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) 
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:554) 
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:509) 
    at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:503) 
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.close(PublisherCallbackChannelImpl.java:642) 
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler$1.run(CachingConnectionFactory.java:946) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 

AMQP接続はOKですが、プライベートキューは無効です。

なぜこのようなことが起こるのか分かりません。どのようにしてより多くの情報を入手できますか?このエラーから回復することは可能ですか?

おかげ

----------------(28/11/2016)-----------

理由I

@Bean(name="myAnonymousResponseQueue") 
    public Queue myAnonymousResponseQueue() 
    {  
     return new AnonymousQueue(new Base64UrlNamingStrategy("amq.gen-")); 
    } 

サーバーは私がそれを作成することはできませんし、私は次のエラーを取得:

|28-11-2016 08:43:13.203|INFO |org.springframework.amqp.rabbit.core.RabbitAdmin|initialize|RabbitAdmin.java|493||Auto-declaring a non-durable, auto-delete, or exclusive Queue (amq.gen-JUYnhLX2SpqNoJT6ioB8GA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 

をこのように定義された匿名のキューを持っている、とのことです
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-JUYnhLX2SpqNoJT6ioB8GA' in vhost 'sob', class-id=50, method-id=10) 

|28-11-2016 08:43:13.486|WARN |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|attemptPassiveDeclarations|BlockingQueueConsumer.java|581||Failed to declare queue:amq.gen-JUYnhLX2SpqNoJT6ioB8GA 
|28-11-2016 08:43:13.486|WARN |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|start|BlockingQueueConsumer.java|495||Queue declaration failed; retries left=3 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[amq.gen-JUYnhLX2SpqNoJT6ioB8GA] 

|28-11-2016 08:43:28.868|WARN |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|attemptPassiveDeclarations|BlockingQueueConsumer.java|581||Failed to declare queue:amq.gen-JUYnhLX2SpqNoJT6ioB8GA 
|28-11-2016 08:43:28.869|ERROR|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|run|SimpleMessageListenerContainer.java|1372||Consumer received fatal exception on startup 
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it. 

ので、AmqpAdmin()。declareQueue()とAnonymousQueue間のデ違いは何ですか?ブローカがキューに名前を付けることを許可していないのでしょうか?


私は問題を理解していると思います。私は自分のユーザが "amq.gen-"という名前のキューしか作成できないと思う。私は他の名前にしようと私が取得:

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue '4788a39b-fffe-4eae-b252-8d842234a018' in vhost 'sob' refused for user 'USER', class-id=50, method-id=10) 

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to queue 'amq-_zoOEt5jTcqMduGWNyJ4Zg' in vhost 'sob' refused for user 'USER', class-id=50, method-id=10) 

だから私は唯一のブローカーがキューを生成し、使用することができ、私はreconectionでそれを再宣言する必要がある場合は、私は何ができるのでしょうか?

もう一度ありがとうございます。

EDIT

私は回避策を適用しようとしています。私は努力を続けるが、それを修正する方法上の任意のアイデア

|30-11-2016 10:56:17.312|ERROR|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|redeclareElementsIfNecessary|SimpleMessageListenerContainer.java|1116||Failed to check/redeclare auto-delete queue(s). 
    org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalStateException: Listener expects us to be listening on '[amq.gen-Xg5MG5n42ecpoW4-DA198A]'; our queues: [amq.gen-2qRLgfUmMxskshFCi1dzuA] 
     at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:80) 
     at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113) 
     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:553) 
     at es.omie.amqp.config.listener.XBIDConnectionListener.onCreate(XBIDConnectionListener.java:57) 
     at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) 
     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:553) 
     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:500) 
     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:474) 
     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:467) 
     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:97) 
     at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1084) 
     at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1394) 
     at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1370) 
     at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1346) 
     at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:335) 
     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1102) 
     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:95) 
     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1278) 
     at java.lang.Thread.run(Unknown Source) 
    Caused by: java.lang.IllegalStateException: Listener expects us to be listening on '[amq.gen-Xg5MG5n42ecpoW4-DA198A]'; our queues: [amq.gen-2qRLgfUmMxskshFCi1dzuA] 
     at org.springframework.util.Assert.state(Assert.java:392) 
     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doStart(SimpleMessageListenerContainer.java:770) 
     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:550) 
     ... 16 more 

:私はConnectionListenerを宣言している:

@Override 
public void onCreate(Connection arg0) 
{ 
    myResponseMessageListenerContainer.stop(); 
    String[] colaAnterior = myResponseMessageListenerContainer.getQueueNames(); 

    Queue q = myAmqpAdmin.declareQueue(); 
    q.setAdminsThatShouldDeclare(myAmqpAdmin); 
    q.setShouldDeclare(true); 

    myResponseMessageListenerContainer.addQueueNames(q.getName()); 

    myResponseMessageListenerContainer.removeQueueNames(colaAnterior); 
    myResponseMessageListenerContainer.initialize(); 
    myResponseMessageListenerContainer.start(); 
} 

しかし、今、私はこのエラーがありますか?

おかげ

今EDIT 2

I持っているこの:

@Override 
    public void onCreate(Connection arg0) 
    { 
     myResponseMessageListenerContainer.stop(); 

     String[] colaAnterior = myResponseMessageListenerContainer.getQueueNames(); 
     Queue q = myAmqpAdmin.declareQueue(); 

     log.info(" ------ RESPONSE QUEUE -> OLD NAME: " + Arrays.asList(colaAnterior) + " NEW NAME: " + q.getName()); 

     myResponseMessageListenerContainer.addQueueNames(q.getName()); 
     myRabbitTemplate.setReplyAddress(q.getName()); 
     myRabbitTemplate.setQueue(q.getName()); 

     myResponseMessageListenerContainer.removeQueueNames(colaAnterior); 

     myResponseMessageListenerContainer.shutdown(); 
     myResponseMessageListenerContainer.initialize(); 
     myResponseMessageListenerContainer.start(); 
    } 

しかし、コンテナが最後に定義されたキューに滞在していません:

|30-11-2016 16:32:19.996|INFO |es.omie.amqp.config.listener.XBIDConnectionListener|onCreate|XBIDConnectionListener.java|42|| ------ RESPONSE QUEUE -> OLD NAME: [amq.gen-uTp6TCP66x2AlXUmQzqz8g] NEW NAME: amq.gen-DOwEn8WKz_9ymCBQFiMDNg 


com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'amq.gen-uTp6TCP66x2AlXUmQzqz8g' in vhost 'sob', class-id=50, method-id=10) 

私ができます新しいキュー名が表示されません。接続が作成される前に、

私がログにコンテナの再起動を確認することができます。

Restarting Consumer: tags=[{amq.ctag-KYCURTkS4EevXHQtrpYV9Q=amq.gen-uTp6TCP66x2AlXUmQzqz8g}] 

ではなく、startメソッドの後に。

答えて

1

スプリングAMQPは、この理由で独自のAnonymousQueueを提供しています。そのランダムな名前は保持されるので、接続が再確立されると、宣言が再作成されます。あなたのコードでは、ブローカにキューの名前を付けることができます。

ただし、あなた自身ではなく、RabbitAdminが宣言を自動的に処理するようにしてください。 (また@Beanでなければなりません)管理者は、初期接続(または再接続)を検出

@Bean 
public Queue myAnonymousResponseQueue() { 
    return new AnonymousQueue(); 
} 

、それはそのようなすべてのキューを宣言します。

Configuring the Brokerを参照してください。

EDIT

管理者は、あなたが(あなたが接頭辞としてamq.gen-を使用することはできません)あなたは、ブローカーが生成したキューを再宣言する必要としてコンテナを更新するキューに名前を付けることが許可されていない場合新しいキュー

ConnectionListenerを接続ファクトリに追加します。 onCreate()が呼び出されたとき(新しい接続が確立されたため)、コンテナを停止し、キューを再宣言し、コンテナのキューを新しい名前に更新します。コンテナを起動します。

一時的なキュー内のメッセージは、接続が切断されると失われます。

コンテナスレッドでリスナーが呼び出される可能性があるため、この作業を別のスレッドに渡す必要があります。それ以外の場合は遅延が発生します。

EDIT2

@SpringBootApplication 
@EnableRabbit 
public class So40802855Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So40802855Application.class, args); 
     Thread.sleep(2000); // wait for reply queue setup 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     System.out.println(template.convertSendAndReceive("test.x", "foo")); 
     context.getBean(CachingConnectionFactory.class).resetConnection(); 
     Thread.sleep(2000); // wait for reply queue setup 
     System.out.println(template.convertSendAndReceive("test.x", "bar")); 
     context.getBean(RabbitAdmin.class).deleteQueue("test.x"); 
     context.close(); 
     System.exit(0); 
    } 

    @Bean 
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setReplyTimeout(30000); 
     return rabbitTemplate; 
    } 

    @Bean 
    public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { 
     connectionFactory.addConnectionListener(listener()); 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
     container.setMessageListener(rabbitTemplate(connectionFactory)); 
     container.setAutoStartup(false); 
     container.setDeclarationRetries(0); 
     container.setFailedDeclarationRetryInterval(500); 
     return container; 
    } 

    @Bean 
    public Queue testX() { 
     return new Queue("test.x"); 
    } 

    @Bean 
    public ConnectionListener listener() { 
     return new MyConnectionListener(); 
    } 

    @RabbitListener(queues = "test.x") 
    public String listen(String in) { 
     return in.toUpperCase(); 
    } 

    public static class MyConnectionListener implements ConnectionListener { 

     private static final Log logger = LogFactory.getLog(MyConnectionListener.class); 

     @Autowired 
     private RabbitTemplate template; 

     @Autowired 
     private AmqpAdmin admin; 

     @Autowired 
     private ApplicationContext applicationContext; 

     @Override 
     public void onCreate(Connection connection) { 
      SimpleMessageListenerContainer replyContainer = applicationContext.getBean("replyContainer", 
        SimpleMessageListenerContainer.class); 
      // need to stop/start asynchronously to avoid deadlock 
      Executors.newSingleThreadExecutor().execute(() -> { 
       if (replyContainer.isRunning()) { 
        logger.info("Waiting for the container to stop itself because of missing queue"); 
        while (replyContainer.isRunning()) { 
         try { 
          Thread.sleep(100); 
         } 
         catch (InterruptedException e) { 
          Thread.currentThread().interrupt(); 
         } 
        } 
        logger.info("Container stopped itself because of missing queue"); 
       } 
       Queue queue = this.admin.declareQueue(); 
       logger.info("Changing queue from " + Arrays.asList(replyContainer.getQueueNames()) + " to " 
         + queue.getName()); 
       this.template.setReplyAddress(queue.getName()); 
       replyContainer.setQueues(queue); 
       logger.info("Starting container"); 
       replyContainer.start(); 
      }); 
     } 

     @Override 
     public void onClose(Connection connection) { 
     } 

    } 

} 
+0

あなたのゲイリーありがとうございました。 AnonymousQueueは私のためには機能しません。理由はわかりません。私はブローカーやログのコントロールを持っていないので、私は理由を知らない。私はより詳細な質問を編集しました。 – jandres

+0

'amq.gen-'接頭辞を使用することはできません。これは、ブローカ自身が指定したキューに予約されています。宣言は無視されます。 –

+0

ありがとう、ゲイリー、今私は問題を理解するが、私はそれを解決することはできません。私は再び私の質問を編集しました。 – jandres

関連する問題