ActiveMQキューに接続し、@ JmsListenerを使用してそのキューからメッセージを処理し、そのメッセージを同じActiveMQブローカ上のトピックにプッシュするには、Spring 4.xのDefaultJmsListenerContainerFactoryを使用しています。DefaultJmsListenerContainerFactoryと同時接続がシャットダウンしない
コンシューマ/リスナとプロデューサの両方に対して1つのキャッシング接続ファクトリを使用しています。キャッシュコンシューマをfalseに設定して、プロデューサをキャッシュすることはできますが、コンシューマはキャッシュできません。私はまた同時実行性を1-3に設定しました。アプリケーションの起動時に待ち行列に最低1人の消費者が入り、メッセージが増加すると消費者の数は3に達するでしょう。しかし、メッセージが消えてしまったので、私は消費者の数が1に戻ると予想していました。しかし、スレッド(defaultmessagelistenercontainer-2/3)を見ると、それらは待機状態にあり、シャットダウンしません。負荷が低下したときに消費者の数もシャットダウンすることが予想される行動ではないのでしょうか?下記の私の設定を見て、この動作がそのままではないかどうか、私が上に示したようにこれを動作させるために何かを追加する必要があるかどうかを教えてください。
ApplicationContext.java
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class));
redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class));
redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class));
redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class));
redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class));
ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint"));
activeMQ.setRedeliveryPolicy(redeliveryPolicy);
activeMQ.setPrefetchPolicy(prefetchPolicy());
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class));
cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class));
return cachingConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out"));
JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
jmsMessagingTemplate.setDefaultDestination(activeMQ);
return jmsMessagingTemplate;
}
application.properties
jms.connections.concurrent=1-3
jms.connections.prefetch=1000
jms.connections.transacted=true
jms.connections.cache.consumers=false
jms.redelivery.initial-delay=1000
jms.redelivery.delay=1000
jms.redelivery.maximum=5
jms.redelivery.use-exponential-back-off=true
jms.redelivery.back-off-multiplier=2
jms.cache.size=3
jms.queue.in=in.queue
jms.queue.out=out.queue
jms.broker.endpoint=failover:(tcp://localhost:61616)
ありがとうございます!私はこれを試して、それは働いた!しかし、私がプロファイリングをしていたときには、受信試行とタスクごとの最大メッセージの値に基づいて、dmlcコンテナスレッドが常に再作成されていたことに気付きました。これは単なる観測であり、誰もそれについてコメントできますか? – jcb
はい私はあなたに1タスクあたり最大1メッセージの例を示しましたが、私が提供したドキュメンテーションリンクで述べたように1が低すぎます。 max attemptspertaskを10に増やして10回の試行を許可し、IdleTaskExecutionLimitを増やして最大試行回数に達したときにスレッドを再利用できるようにする必要があります。 http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setIdleTaskExecutionLimit-int-docに詳しく説明されています。 –