2016-09-09 4 views
1

ActiveMQキューに接続し、@ JmsListenerを使用してそのキューからメッセージを処理し、そのメッセージを同じActiveMQブローカ上のトピックにプッシュするには、Spring 4.xのDefaultJmsListenerContainerFactoryを使用しています。DefaultJmsListenerContainerFactoryと同時接続がシャットダウンしない

コンシューマ/リスナとプロデューサの両方に対して1つのキャッシング接続ファクトリを使用しています。キャッシュコンシューマをfalseに設定して、プロデューサをキャッシュすることはできますが、コンシューマはキャッシュできません。私はまた同時実行性を1-3に設定しました。アプリケーションの起動時に待ち行列に最低1人の消費者が入り、メッセージが増加すると消費者の数は3に達するでしょう。しかし、メッセージが消えてしまったので、私は消費者の数が1に戻ると予想していました。しかし、スレッド(defaultmessagelistenercontainer-2/3)を見ると、それらは待機状態にあり、シャットダウンしません。負荷が低下したときに消費者の数もシャットダウンすることが予想される行動ではないのでしょうか?下記の私の設定を見て、この動作がそのままではないかどうか、私が上に示したようにこれを動作させるために何かを追加する必要があるかどうかを教えてください。

ApplicationContext.java

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 

    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent")); 
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class)); 
    return factory; 
} 

@Bean 
public CachingConnectionFactory connectionFactory(){ 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class)); 
    redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class)); 
    redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class)); 
    redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class)); 
    redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class)); 

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint")); 
    activeMQ.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQ.setPrefetchPolicy(prefetchPolicy()); 

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ); 
    cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class)); 
    cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class)); 
    return cachingConnectionFactory; 
} 

@Bean 
public JmsMessagingTemplate jmsMessagingTemplate(){ 
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out")); 

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory()); 
    jmsMessagingTemplate.setDefaultDestination(activeMQ); 

    return jmsMessagingTemplate; 
} 

application.properties

jms.connections.concurrent=1-3 
jms.connections.prefetch=1000 
jms.connections.transacted=true 
jms.connections.cache.consumers=false 
jms.redelivery.initial-delay=1000 
jms.redelivery.delay=1000 
jms.redelivery.maximum=5 
jms.redelivery.use-exponential-back-off=true 
jms.redelivery.back-off-multiplier=2 
jms.cache.size=3 
jms.queue.in=in.queue 
jms.queue.out=out.queue 
jms.broker.endpoint=failover:(tcp://localhost:61616) 

答えて

1

あなたはドキュメントを参照することができmaxMessagesPerTask > 0

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 

    factory.setConnectionFactory(connectionFactory()); 
    factory.setMaxMessagesPerTask(1); 
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent")); 
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class)); 
    return factory; 
} 

を設定してみてくださいhttp://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setMaxMessagesPerTask-int-

は、Qで1000件のメッセージが待機している場合、これらの1000件のメッセージを処理するスレッドが1つだけ開始されることを意味します。

たとえば、jms.connections.prefetch=1は、メッセージが使用可能なすべてのスレッドに均等にディスパッチされることを意味しますが、これにより、長寿命のタスクが頻繁なスレッドコンテキストスイッチを回避するため、maxMessagesPerTask < 0を設定する方が良いです。 http://activemq.apache.org/what-is-the-prefetch-limit-for.html

+0

ありがとうございます!私はこれを試して、それは働いた!しかし、私がプロファイリングをしていたときには、受信試行とタスクごとの最大メッセージの値に基づいて、dmlcコンテナスレッドが常に再作成されていたことに気付きました。これは単なる観測であり、誰もそれについてコメントできますか? – jcb

+0

はい私はあなたに1タスクあたり最大1メッセージの例を示しましたが、私が提供したドキュメンテーションリンクで述べたように1が低すぎます。 max attemptspertaskを10に増やして10回の試行を許可し、IdleTaskExecutionLimitを増やして最大試行回数に達したときにスレッドを再利用できるようにする必要があります。 http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setIdleTaskExecutionLimit-int-docに詳しく説明されています。 –

関連する問題