2016-09-23 5 views
0

でポーリング私は春の統合3.0.1.RELEASEに、次のようJMSインバウンド・チャネル・アダプターを定義した:春の統合JMSインバウンド・チャネル・アダプターません指定された固定金利

<int-jms:inbound-channel-adapter channel="inChannel" phase="1000" 
           destination-name="jmsQueue" extract-payload="true" 
           connection-factory="connectionFactory"> 
    <int:poller max-messages-per-poll="1" fixed-rate="1000"/> 
</int-jms:inbound-channel-adapter> 

しかし、複数のメッセージがから消費されています信頼できない無作為な距離のJMSブローカー。消費される各メッセージの間に数秒から数分かかることがあります。私はfixed-rateの代わりにfixed-delayを試しましたが、同じ動作をします。

ポーリング操作を別の時間に実行できる他の要因はどれですか。どのようにして確実なポーリング時間を達成できますか?

EDIT:

(チャネルアダプタを主導いくつかのメッセージがありますが)私は、単一のJMSインバウンド・チャネル・アダプターと、単一のデフォルトのポーラーにアプリケーションを限定しました、そして、それはまだ同じ挙動を有するます。待機時間をfixed-delay,3000、receive-timeoutを5000に変更しました。

JMSキューのいくつかのメッセージでアプリケーションを開始しましたが、ログにこのようなエントリが表示され、スレッドが切り替わります

2016-09-23 18:48:25,592 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:18:1,started=true} 
2016-09-23 18:48:25,630 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:28,639 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:19:1,started=true} 
2016-09-23 18:48:28,643 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:31,651 | DEBUG | ask-scheduler-3 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:20:1,started=true} 
2016-09-23 18:48:31,657 | DEBUG | ask-scheduler-3 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 
2016-09-23 18:48:34,666 | DEBUG | ask-scheduler-1 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:21:1,started=true} 
2016-09-23 18:48:34,670 | DEBUG | ask-scheduler-1 | ion.endpoint.SourcePollingChannelAdapter | Received no Message during the poll, returning 'false' 

そして、10分後:

2016-09-23 18:58:10,032 | DEBUG | ask-scheduler-8 | ework.integration.jms.DynamicJmsTemplate | Executing callback on JMS Session: ActiveMQSession {id=ID:v10033215-56491-1474666967074-0:212:1,started=true} 
2016-09-23 18:58:10,091 | DEBUG | ask-scheduler-8 | ion.endpoint.SourcePollingChannelAdapter | Poll resulted in Message: 

、メッセージが消費され、以下に示すように、いくつかのコールバック操作の後、タスクスケジューラ。

私は、複数のダンプを取ってきて、状態を実行している上、タスクエグゼキュータ・スレッドのインスタンスを1つだけ見つけることができる:

"task-scheduler-4" prio=6 tid=0x000000001074f800 nid=0x4364 runnable [0x000000001d4fe000] 
    java.lang.Thread.State: RUNNABLE 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 
    at java.net.SocketOutputStream.write(SocketOutputStream.java:159) 
    at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:123) 
    at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:167) 
    at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237) 
    - locked <0x00000007dc803080> (a java.util.concurrent.atomic.AtomicBoolean) 
    at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83) 
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104) 
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) 
    - locked <0x00000007dc8031f8> (a java.lang.Object) 
    at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) 
    at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225) 
    at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219) 
    at org.apache.activemq.ActiveMQSession.doClose(ActiveMQSession.java:590) 
    at org.apache.activemq.ActiveMQSession.close(ActiveMQSession.java:581) 
    at org.springframework.jms.support.JmsUtils.closeSession(JmsUtils.java:108) 
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497) 
    at org.springframework.jms.core.JmsTemplate.receiveSelected(JmsTemplate.java:761) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.doReceiveJmsMessage(JmsDestinationPollingSource.java:118) 
    at org.springframework.integration.jms.JmsDestinationPollingSource.receive(JmsDestinationPollingSource.java:93) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:111) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:184) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

他のすべてのスレッドダンプは、タスク・スケジューラのためのすべてのスレッドがどちらかにあることを示していますWAITINGまたはTIMED_WAITINGを次のようにします(終了後の前のダンプ上のスレッドを含む)。これは、最後の1秒後の30秒後にダンプします。

"task-scheduler-4" prio=6 tid=0x00000000118d3800 nid=0x4abc waiting on condition [0x000000000f8bf000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000007838d74d8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

    Locked ownable synchronizers: 
    - None 

"task-scheduler-3" prio=6 tid=0x00000000118d4800 nid=0x4f14 waiting on condition [0x000000001ba0f000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x0000000787c10210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

    Locked ownable synchronizers: 
    - None 

ヒント?

答えて

0

アプリケーションに多数のポーラーがある場合は、スレッドスターベーションに苦しんでいる可能性があります。 default task schedulerには10個のスレッドしかありません。カウントを増やすことができます。

ポーラーポーリングfrom QueueChannelは、デフォルトで1秒間ブロックされます。

一般に、org.springframework.integrationのDEBUGログを有効にすると、このような問題の解決に役立ちます。

スレッドダンプを取って、タスクスケジューラのスレッドアクティビティを調べることも役立ちます。

+0

私はいくつかのログとスレッドダンプを使って投稿を編集しましたが、それでもそれを並べ替えることはできませんでした:/ – gnzlrm

+0

ポーラーがActiveMQと対話してセッションを閉じることは明らかです。私はactivemqの内部を知りませんし、なぜそれが長い時間がかかるかもしれません。巨大なメッセージや貧弱なネットワークを持っていると、散発的なタイミングを想像することができます。次のステップでは、これをデバッグするとWireSharkなどのネットワークトレースを見ることになります。 –

+0

申し訳ありませんが、私は先週の休暇のために外出しました。私は制限されたツールボックスで制御された環境にいるのでWireSharkを使うことはできません:/ 私はそれを見ていますが、私は 'receive-timeout = 5000'を指定していますが、 (一部の世論調査はほとんど瞬時に「偽」と報告されます)、メッセージの受信を報告する操作にはより多くの時間がかかります。コールバック操作の開始から結果が「偽」として報告されるまでに5秒間待たなければなりませんか? – gnzlrm

関連する問題