とRabbitMQのアイドリング:チャンネルは、この例のオフ私のアーキテクチャを基づか作業キューのシナリオ
セットアップ:
- 労働者は、各労働者がダウンロードし
- 一度に一つのメッセージが表示されます文書に数秒かかります
- 作業者が文書を正常にダウンロードすると、メッセージを確認します
- 労働者がドキュメントをダウンロードするために失敗した場合、それは私がスローダウンを引き起こしている私の実装のボトルネックに探しています再キューイング(3再試行の最大)
のためのメッセージをnoAcks。私はnoAckを使って失敗した労働者を再キューに入れているからです。これを有効にするには均等に私のワーカースレッド間、私はこの質問で1.Lookingにプリフェッチ設定しました:RabbitMQ work queue is blocking consumers - 彼らは、私は下のスクリーンショットで見るものを見ていますようにするには
労働者には一度にメッセージが割り当てられるだけですが、私はプリフェッチを1に設定する必要がありますが、これは労働者が労働者を並行してではなく順番に働かせると言う人がいます。
実際にランニングとはチャネルレベルで何を意味していますか?私はキューと接続が正しく動作しているが、個々のチャネル(スレッドごとに1つ)がアイドリングしていることがわかります。
EDIT#1:RabbitMQ接続に接続プールを渡すことについてのこのメモは有望です。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool私は春のAMQPを使用していますが、私は同様のアプローチは、ここで使用することができると思います。
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}