私は、ボイラープレートのJMSコードを使用してActiveMQサーバーとの間でメッセージを送受信するJavaアプリケーションを使用しています。JMS/ActiveMQ MessageConsumer.recieve()が返されない
this.consumerFactory = new ActiveMQConnectionFactory(this.ingestItemBrokerUrl);
this.consumerConnection = this.consumerFactory.createConnection();
this.consumerConnection.start();
this.consumerSession = this.consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.consumerDestination = this.consumerSession.createTopic(getIngestItemDestinationName());
this.consumer = this.consumerSession.createConsumer(this.consumerDestination);
私のアプリケーションは、(接続が閉じられるまで)は、ActiveMQのトピックに到着したメッセージを処理するループでMessageConsumer.receive()
を呼び出すには:
:ここ
message = this.consumer.receive();
を謎です私はlocalhostで動作しているActiveMQサーバーに接続しますが、これは期待通りに機能します。しかし、Azureクラウドマシン(BitnamiのActiveMQスタックでロードされている)で稼動しているActiveMQサーバーに接続すると、コールは無期限にブロックされますが、AMQ管理コンソールからクライアントがメッセージの接続とデキューを行っていることがわかります。
なぜ、ローカルサーバーからリモートサーバーに切り替えるときに違う動作が見られるのですか?どうすればトラブルシューティングをさらに進めることができますか?
マイクラウドactivemq.xml設定ファイルは以下の通りです:
<beans xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean id="configurationEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
<property name="algorithm" value="PBEWithMD5AndDES"/>
<property name="password" value="**REDACTED**"/>
</bean>
<bean id="propertyConfigurer" class="org.jasypt.spring31.properties.EncryptablePropertyPlaceholderConfigurer">
<constructor-arg ref="configurationEncryptor"/>
<property name="location" value="file:${activemq.conf}/credentials-enc.properties"/>
</bean>
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<plugins>
<!--simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="admins"/>
</users>
</simpleAuthenticationPlugin-->
<!-- if not already set, set ttl to 1 minutes -->
<timeStampingBrokerPlugin zeroExpirationOverride="60000"/>
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
If using ActiveMQ embedded - the following limits could safely be used:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&trace=true&needClientAuth=true"/>
</transportConnectors>
<!-- SSL Configuration Context -->
<sslContext>
<sslContext keyStore="file:${activemq.conf}/amq-server.ks"
keyStorePassword="**REDACTED**"
trustStore="file:${activemq.conf}/amq-server.ts"
trustStorePassword="**REDACTED**" />
</sslContext>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans><!-- END SNIPPET: example -->
受信メソッドは、メッセージが正常に受信されるまで無期限にブロックされるように設計されています。消費者が消費していないときにトピックにメッセージを送信しましたか?クラウドの設定は異なりますか?あなたはAMQクラウドの設定を投稿できます –
私は 'receive()'がメッセージが到着するまでブロックするはずだと理解します。 locahostを使用すると、メッセージを送信するまでブロックされます。クラウドAMQを使用すると、メッセージを送信した後でもブロックされます。 –
クラウドAMQ –