0

私は、私のspringとrabbitmqとの統合状況があります。メッセージはキューに送信されますが、準備完了状態では終了し、未確認状態では1つになりますが、コンシューマはそれらを取得しません。spring integration rabbitmq準備完了状態のメッセージ

のTHX XML設定は、このようなものです:サーバー上

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/task 
    http://www.springframework.org/schema/task/spring-task.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

<context:property-placeholder location="classpath:spring/prj-rabbitmq-context-thirdparty.properties" ignore-unresolvable="true" order="3"/> 

<rabbit:connection-factory 
     id="prjRabbitmqConnectionFactory" 
     addresses="${rabbitmq.addresses}" 
     username="${rabbitmq.username}" 
     password="${rabbitmq.password}" 
     connection-timeout="5000" /> 

<bean id="rabbitTxManager" 
     class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> 
    <property name="connectionFactory" ref="prjRabbitmqConnectionFactory"/> 
</bean> 

<rabbit:template 
     id="prjRabbitmqTemplate" 
     connection-factory="prjRabbitmqConnectionFactory" 
     message-converter="serializerMessageConverter" 
     retry-template="retryTemplate" /> 

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 
    <property name="backOffPolicy"> 
     <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 
      <property name="initialInterval" value="1000" /> 
      <property name="multiplier" value="3" /> 
      <property name="maxInterval" value="10000" /> 
     </bean> 
    </property> 
</bean> 

<rabbit:admin 
     id="prjRabbitmqAdmin" 
     auto-startup="true" 
     connection-factory="prjRabbitmqConnectionFactory" /> 

<rabbit:queue 
     id="prjSyncQueue" 
     name="${prj.sync.queue}" 
     durable="true"> 
    <rabbit:queue-arguments> 
     <entry key="x-ha-policy" value="all" /> 
    </rabbit:queue-arguments> 
</rabbit:queue> 


<rabbit:listener-container 
     connection-factory="prjRabbitmqConnectionFactory" 
     acknowledge="auto" 
     channel-transacted="true" 
     transaction-manager="rabbitTxManager" 
     task-executor="prjSyncExecutor" 
     concurrency="1" 
     max-concurrency="2" 
     requeue-rejected="true" 
     message-converter="serializerMessageConverter"> 
    <rabbit:listener 
      ref="prjProcessorService" 
      queue-names="${prj.sync.queue}" method="processMessage" /> 
</rabbit:listener-container> 

<task:executor id="prjSyncExecutor" 
       pool-size="${prj.sync.concurrency.min}-${prj.sync.concurrency.max}" 
       keep-alive="${prj.sync.concurrency.keep-alive}" 
       queue-capacity="${prj.sync.concurrency.queue}" 
       rejection-policy="CALLER_RUNS"/> 
<int:channel 
     id="prjChannel" /> 

<int-amqp:outbound-channel-adapter 
     channel="prjChannel" 
     amqp-template="prjRabbitmqTemplate" 
     exchange-name="prjSyncExchange" 
     routing-key="prj-event" 
     default-delivery-mode="PERSISTENT" /> 


<rabbit:direct-exchange 
     name="prjSyncExchange"> 
    <rabbit:bindings> 
     <rabbit:binding 
       queue="prjSyncQueue" 
       key="prj-event" /> 
    </rabbit:bindings> 
</rabbit:direct-exchange> 

<int:gateway 
     id="prjGateway" 
     service-interface="ro.oss.niinoo.thirdparty.prj.gateway.prjEnrichmentGateway"> 
    <int:method 
      name="send" 
      request-channel="prjChannel"/> 
</int:gateway> 

<bean id="prjProcessorService" class="ro.oss.niinoo.thirdparty.prj.processor.impl.prjEnrichmentProcessorImpl" /> 
<bean id="serializerMessageConverter" class="ro.oss.niinoo.thirdparty.prj.serializer.prjSerializer"/> 

は、最初の1がピックアップされ、再起動しますが、キュー内の次の呼び出しでメッセージパイルアップ。なぜこのようなことが起こるのかご存じですか?

おかげ ダニエル

EDIT:

コンシューマコード:

public class JsonEnrichmentService implements EnrichmentService { 

@Resource 
private UserQueryService userQueryService; 

@Resource 
private SecurityContextService securityContextService; 

@Override 
public void processMessage(POJO record) { 
    System.out.println(record); 
} 

このことにトランザクション注釈を持つ新しいサービスを呼び出します。

答えて

0

私の経験では、これは一般的に、リスナースレッドがユーザーコードのどこかに「スタック」しているために発生します。スレッドダンプを取って、リスナースレッドが何をしているのかを確認します。

+0

私はこれを想定し、消費者コードにSystem.out.printlnを1つ配置しましたが、何も変更されませんでした。 –

+0

私が言ったように、スレッドダンプを見る必要があります。 –

関連する問題