2017-11-29 15 views
-1

私はrabbitmqキューから読み込んでデータストアに書き込む必要があります。現在、キューからメッセージを読み取るのにAmqpItemReaderを使用しています。rabbit mqからの読み込みが遅い

私が読んだデータはJson形式であり、すべて私のItemProcessorは、jsonをJavaオブジェクトにシリアル化することです。

私のシングルスレッドソリューションのパフォーマンスは非常に低いです。私は1秒あたり12msgの割合で消費することができます。私は約1000万レコードを処理するだろう。だから、私はマルチスレッドのステップに変更しようとしましたが、スループットの大幅な向上は見られませんでした(これは約50秒/秒でした)。

私はどのように仕事をスピードアップしますか?私が取っているルートが正しいかどうか疑問に思っています。この上の任意の光をいただければ幸いです。前もって感謝します。

編集:私が達成しようとしていることをさらに明確にするためのコード/構成が含まれています。

ラビットサーバーの設定: AWS上の3ノードクラスタで、それぞれ0.5ギガバイトのメモリがあります。

メッセージの詳細: 各ペイロードは約1キロバイトのJSONです。

開発マシン(Macintosh)で春バッチジョブを実行しています。

システム構成:

Processor Name: Intel Core i7 
    Processor Speed: 2.5 GHz 
    Number of Processors: 1 
    Total Number of Cores: 4 
    L2 Cache (per Core): 256 KB 
    L3 Cache: 6 MB 
    Memory: 16 GB 

マイItemReader:

import java.io.IOException; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.core.Message; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.batch.item.amqp.AmqpItemReader; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Component; 

@Component 
public class RabbitMQItemReader extends AmqpItemReader<Message> { 

    private final Logger logger = LoggerFactory.getLogger(RabbitMQItemReader.class); 

    @Autowired 
    private final RabbitTemplate template; 

    public RabbitMQItemReader(RabbitTemplate rabbitTemplate) throws IOException { 
    super(rabbitTemplate); 
    template = rabbitTemplate; 
    } 

    @Override 
    public Message read() { 
    return template.receive(); 
    } 
} 

マイステップ:

private Step step() throws Exception { 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
    executor.setCorePoolSize(100); 
    executor.setMaxPoolSize(100); 
    executor.setThreadNamePrefix("SThread"); 
    executor.setWaitForTasksToCompleteOnShutdown(true); 

    executor.initialize(); 
    return stepBuilderFactory.get("queueToCassandraStep") 
     .<Message, Vendor>chunk(100) 
     .reader(itemReader) 
     .listener(new QueueReaderListener<>()) 
     .processor(asyncItemProcessor()) 
     .writer(asyncItemWriter()) 
     .taskExecutor(executor) 
     .build(); 
    } 

ウサギ設定:

import lombok.Setter; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.boot.context.properties.ConfigurationProperties; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
@ConfigurationProperties("art.com.service.product.config.rabbitmq") 
public class RabbitConfig { 

    @Setter 
    private String host; 
    @Setter 
    private Integer port; 
    @Setter 
    private String username; 
    @Setter 
    private String password; 
    @Setter 
    private String exchangeName; 
    @Setter 
    private String queueName; 

    @Bean 
    ConnectionFactory rabbitConnectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host); 
    connectionFactory.setPort(port); 
    connectionFactory.setUsername(username); 
    connectionFactory.setPassword(password); 
    return connectionFactory; 
    } 

    @Bean 
    public MessageConverter jsonMessageConverter() { 
    return new Jackson2JsonMessageConverter(); 
    } 

    @Bean 
    RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory, 
     MessageConverter jsonMessageConverter) { 

    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); 
    rabbitTemplate.setQueue(queueName); 
    rabbitTemplate.setExchange(exchangeName); 
    rabbitTemplate.setMessageConverter(jsonMessageConverter); 

    return rabbitTemplate; 
    } 


} 

他の設定やコードが役に立ったら教えてください。私もそれらを共有することができます。

+0

あなたは素晴らしい苦情を書きましたが、誰かがあなたを助けるためには、少なくともいくつかの技術的な詳細を提供した方がいいかもしれません。これには、マシン構成、Rabbit構成、メッセージサイズ、およびその他の関連する詳細が含まれます。 – theMayer

+0

@theMayer configsと少しのコードが追加されました。 – Wizard

答えて

0

まあ、amqpTemplate.receive()は間違いなく非常に遅いです。これは、Channel.basicGet()に基づいており、これは長寿命のようには機能しませんBasicConsumer。私は、を好きにして、prefetchと一緒に、春のAMQからMessageListenerContainerを放棄することを提案します。

+0

私はここで何かを明確にする必要があります - Basic.Getは、基本的にプロトコルレベルでBasic.Deliverと同じデータを送信します。したがって、クライアントの実装のアーティファクトである必要があります。実際には、ファンキーなルータとして単にRMQを使用しているのでなければ、メッセージをプルするよりもはるかに賢明です。 – theMayer

+1

'AmqpItemReader'は' RabbitTemplate.receive() 'に基づいています。それは 'Channel'を開き、' DefaultConsumer'を作成し、 'basicConsume()'を実行し、 'Delivery'を待って、すべてをバックオーダーで閉じます。ですから、技術的には同じことをしますが、 'ListenerContainer'の場合は常にそれを行います。 'RabbitTemplate.receive()'を使って、呼び出しごとにリソースを開き、閉じます。それがパフォーマンスのボトルネックです。 –

+0

OK、これは 'Basic.Get'ではありませんが、接続が開いたままであれば良い実装です。 'channel'はプロトコルに追加された単なる整数なので、この実装では、ワイヤを介して送信される1つの余分なパケットと' Basic.Get'との間に、サーバをポーリングする必要がないというトレードオフがあります。なぜこれがひどく高価になるのかわかりませんが、スループットがどのように制限されるのかが分かりました。消費者がおそらくより良い選択だが、[非常に重要な警告](https://stackoverflow.com/questions/45139668#45148326)がある。 – theMayer

関連する問題