2011-07-15 14 views
3

ActiveMQConnectionFactory.createConnection()を使用してActiveMQ接続を取得するクラスがあります。 次に、セッション、宛先、およびその宛先で待機しているコンシューマ(キュー)を作成および所有します。JMSを中断した後に "ActiveMQ Transport"スレッドをkillする方法MessageConsumer.receive()

私はメッセージをキューから取り出すために、コンシューマに対してreceive()またはreceive(millis)を呼び出します。特定のシナリオでは、受信メソッドが呼び出されているスレッドを強制終了(割り込み)する必要があります。私はそのセッションとその直後に接続を閉じようとします。残念ながら私はクローズを呼び出す際に例外が発生し、関連付けられた "ActiveMQ Transport"スレッド(およびブローカへの関連接続)は生き残っています。私が手に例外は、私は、次の接続URLに フェイルオーバーを試してみた

org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection 
at 
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253) 
at 
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94) 
at org.myorg.myservice.job.Job.run(Job.java:206) 
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio) 
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62) 
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227) 
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219) 
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799) 
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636) 
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627) 
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232) 
... 2 more 
Caused by: java.io.InterruptedIOException 
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102) 
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) 
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) 
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225) 
... 7 more 

です:// TCP://broker.ip.address:61616 TCP://broker.ip.address:61616 フェイルオーバー:// tcp://broker.ip.address:61616?trace = true & closeAsync = false

また、MessageConsumer :: receiveNoWait()呼び出しを使用して自分のThread.sleepを実行しようとしましたが、私は接続を閉じるために次のように呼び出すときはいつも、上記の例外で終わります。

try { 
    // I added the following two lines to see if the taskRunner was handling the threads - still no luck 
    TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner(); 
    taskRunner.shutdown(); 

    if (producer != null) { 
     producer.close(); 
    } 
    if (consumer != null) { 
     consumer.close(); 
    } 
    session.close(); 
    if (connection instanceof ActiveMQConnection) { 
     ActiveMQConnection amqConnection = (ActiveMQConnection) connection; 
     amqConnection.stop(); 
    }   
    connection.stop(); 
    connection.close(); 
} 
catch (ConnectionClosedException e) { 
    // NOOP - this is ok 
} 
catch (Exception e) { 
    throw new MessagingException("Failed to close JMS connection", e, log); 
} 

答えて

0

クローズがあなたにとってうまくいかない理由はわかりません。しかし、非同期メッセージ・リスナーを使用して、必要なものを達成することができます。

Connection connection = connectionFactory.createConnection(); 
connection.start(); 

Session session = 
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

Destination queue = session.createQueue("queueName"); 

Consumer consumer = session.createConsumer(queue); 

consumer.setMessageListener(new MessageListener() 
{ 
    public void onMessage(Message message) 
    { 
     // whatever was after the receive() method call 
    } 
}); 

次に、中断する必要はありません。作業が完了し、近くに物事アウト:

consumer.close(); 
session.close(); 
queue.close(); 
connection.close(); 
関連する問題