2016-12-14 4 views
2

とRabbitMQのアイドリング:チャンネルは、この例のオフ私のアーキテクチャを基づか作業キューのシナリオ

RabbitMQ - Work Queues

セットアップ:

  • 労働者は、各労働者がダウンロードし
  • 一度に一つのメッセージが表示されます文書に数秒かかります
  • 作業者が文書を正常にダウンロードすると、メッセージを確認します
  • 労働者がドキュメントをダウンロードするために失敗した場合、それは私がスローダウンを引き起こしている私の実装のボトルネックに探しています再キューイング(3再試行の最大)

のためのメッセージをnoAcks。私はnoAckを使って失敗した労働者を再キューに入れているからです。これを有効にするには均等に私のワーカースレッド間、私はこの質問で1.Lookingにプリフェッチ設定しました:RabbitMQ work queue is blocking consumers - 彼らは、私は下のスクリーンショットで見るものを見ていますようにするには

acks/second

channels

労働者には一度にメッセージが割り当てられるだけですが、私はプリフェッチを1に設定する必要がありますが、これは労働者が労働者を並行してではなく順番に働かせると言う人がいます。

実際にランニングとはチャネルレベルで何を意味していますか?私はキューと接続が正しく動作しているが、個々のチャネル(スレッドごとに1つ)がアイドリングしていることがわかります。

EDIT#1:RabbitMQ接続に接続プールを渡すことについてのこのメモは有望です。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool私は春のAMQPを使用していますが、私は同様のアプローチは、ここで使用することができると思います。

 /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     //configure a large thread pool for the connection thread 
     int threads = 30; 
     logger.info(String.format("Configuring thread pool with %d threads", threads)); 
     ExecutorService connectionPool = Executors.newFixedThreadPool(threads); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     return cf; 
    } 

    @Bean 
    AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){ 
     AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     return rabbitTemplate; 
    } 

答えて

1

Apparantly春AMQPでデフォルトのスレッド・プールを単一の物理TCP/IP接続が5つのスレッドにデフォルト設定されるため:

Spring AMQP

また、書き込み時には、RabbitMQのクライアントライブラリは、デフォルトでは、各接続(5つのスレッド)のための固定されたスレッドプールを作成します。多数の接続を使用する場合は、CachingConnectionFactoryでカスタム・エグゼキュータを設定することを検討する必要があります。次に、すべての接続で同じエグゼキュータが使用され、そのスレッドを共有できます。エグゼキュータのスレッドプールは無制限にするか、または予想される使用率(通常、接続ごとに少なくとも1つのスレッド)に対して適切に設定する必要があります。各接続で複数のチャネルが作成された場合、プールサイズは同時実行性に影響するため、可変(または単純キャッシュ)スレッドプールエグゼキュータが最適です。私はつまり、仮想チャネルの数をミラーリング接続ごとのスレッド数を持っている上記のコードでは

/** 
    * Expand the number of concurrent threads for a single RabbitMQ connection 
    * http://docs.spring.io/spring-amqp/reference/htmlsingle/ 
    * Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. 
    * When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. 
    */ 
    @Bean(name="channelPool") 
    @Scope("singleton") 
    MigrationPool rabbitConnectionPool(){ 
     int channels = 50; 
     logger.info(String.format("Configuring connection pool with %d threads", channels)); 
     return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS, 
       new LinkedBlockingQueue<Runnable>()); 
    } 

    /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize())); 
     return cf; 
    } 

私はRabbitMQのの接続プールに割り当てられたスレッドの数を変更することで、これを複製することができました仮想RabbitMQチャネルごとに1つの実際の物理スレッド。各チャネルは1つのメッセージを処理するワーカースレッドを参照します。

channels no longer blocking

のRabbitMQの接続に使用可能なスレッドの数は、チャンネルのブロックが表示されます操作:これは、デフォルトで5つの接続を遮断し、代わりにスレッドの拡張された番号をフル活用して、もはやチャンネルになりません。 10スレッドに設定し、例えば50チャンネルを開く。

関連する問題