2017-10-05 6 views
2

Spring起動時にActiveMQの再配信ポリシーを変更するにはどうすればよいですか?私はDefaultJmsListenerContainerFactoryでFixedBackOffを指定しようとしましたが、それは役に立たなかった。以下は、jmsファクトリBeanを初期化するために使用しているコードです。私は、キューに入ってくるメッセージを処理するメッセージコンシューマを持っています。使用できないリソースのため処理中に、私はチェック例外をスローします。私は一定の間隔の後に処理のためにメッセージを再配信することを望んでいます。Sprint起動時のActiveMQのActive MQ RedeliveryPolicyを変更する

春ブーツ:1.5.7.Release

のJava:1.7

@Bean 
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory, 
               DefaultJmsListenerContainerFactoryConfigurer configurer) { 
    DefaultJmsListenerContainerFactory factory = 
     new DefaultJmsListenerContainerFactory(); 

    factory.setBackOff(new FixedBackOff(5000, 5)); 

    // This provides all boot's default to this factory, including the message converter 
    configurer.configure(factory, connectionFactory); 

    // You could still override some of Boot's default if necessary. 
    factory.setErrorHandler(new ErrorHandler() { 

     @Override 
     public void handleError(Throwable t) { 
      LOG.error("Error occured in JMS transaction.", t); 
     } 

    }); 
    return factory; 
} 

コンシューマコード:

@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory") 
@Transactional 
public void receiveMessage(PublishData publishData) { 
    LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId()); 

    PublishUser user = new PublishUser(); 
    user.setPriority(1); 
    user.setUserId(publishData.getUserId()); 

    LOG.trace("Trying to enroll in the publish lock queue for user: " + user); 
    PublishLockQueue lockQueue = publishLockQueueService.createLock(user); 
    if (lockQueue == null) 
     throw new RuntimeException("Unable to create lock for publish"); 
    LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId()); 

    try { 
     publishLockQueueService.acquireLock(lockQueue.getId()); 
     LOG.trace("Acquired publish lock."); 
    } 
    catch (PublishLockQueueServiceException pex) { 
     throw new RuntimeException(pex); 
    } 

    try { 
     publishService.publish(publishData, lockQueue.getId()); 
     LOG.trace("Completed publish of changes."); 

     sendPublishSuccessNotification(publishData); 
    } 
    finally { 
     LOG.trace("Trying to release lock to publish."); 
     publishLockQueueService.releaseLock(lockQueue.getId()); 
    } 

    LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId()); 
} 
+0

そのコンシューマはトランザクションアクノリッジモードを使用して例外時にコンシューマロールバックを実行し、複数のコンシューマを実行している場合は、ActiveMQが同じコンシューマまたは別のコンシューマにメッセージを再配信できるようにします。ただし、バックオフなどのActiveMQで再配信オプションを設定することはできます。上記のエラーハンドラは、ロギング以外はほとんどできないノップリスナーです。 –

+1

コードを共有できますか? – Makoton

+1

@Makoton私は消費者コードも含めました。 – user320587

答えて

0

@clausの返事:私はそれを動作させるテスト:

そのコンシューマは、トランザクションアクノリッジモードを使用する必要があります例外が発生した場合にコンシューマをロールバックさせ、複数のコンシューマを実行している場合は、ActiveMQで同じコンシューマまたは別のコンシューマにメッセージを再配信できるようにします。ただし、バックオフなどのActiveMQで再配信オプションを設定することはできます。上記のエラーハンドラは、ロギング以外のことをすることができないノップリスナーです。

+0

コメントで述べたように、私の問題は再配信ではなく再配信の間隔です。私は、ActiveMQ接続ファクトリオブジェクトを直接使用してバックオフを指定できることを理解しています。これは、ActiveMQに固有のコードを含める必要があることを意味します。私はそれをしないで、Spring DefaultJmsListenerContainerFactory#setBackOffメソッドを使用してバックオフポリシーを指定することを望んでいます – user320587

関連する問題