私はrabbitmq
キューから読み込んでデータストアに書き込む必要があります。現在、キューからメッセージを読み取るのにAmqpItemReader
を使用しています。rabbit mqからの読み込みが遅い
私が読んだデータはJson形式であり、すべて私のItemProcessor
は、json
をJavaオブジェクトにシリアル化することです。
私のシングルスレッドソリューションのパフォーマンスは非常に低いです。私は1秒あたり12msgの割合で消費することができます。私は約1000万レコードを処理するだろう。だから、私はマルチスレッドのステップに変更しようとしましたが、スループットの大幅な向上は見られませんでした(これは約50秒/秒でした)。
私はどのように仕事をスピードアップしますか?私が取っているルートが正しいかどうか疑問に思っています。この上の任意の光をいただければ幸いです。前もって感謝します。
編集:私が達成しようとしていることをさらに明確にするためのコード/構成が含まれています。
ラビットサーバーの設定: AWS上の3ノードクラスタで、それぞれ0.5ギガバイトのメモリがあります。
メッセージの詳細: 各ペイロードは約1キロバイトのJSONです。
開発マシン(Macintosh)で春バッチジョブを実行しています。
システム構成:
Processor Name: Intel Core i7
Processor Speed: 2.5 GHz
Number of Processors: 1
Total Number of Cores: 4
L2 Cache (per Core): 256 KB
L3 Cache: 6 MB
Memory: 16 GB
マイItemReader:
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.batch.item.amqp.AmqpItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQItemReader extends AmqpItemReader<Message> {
private final Logger logger = LoggerFactory.getLogger(RabbitMQItemReader.class);
@Autowired
private final RabbitTemplate template;
public RabbitMQItemReader(RabbitTemplate rabbitTemplate) throws IOException {
super(rabbitTemplate);
template = rabbitTemplate;
}
@Override
public Message read() {
return template.receive();
}
}
マイステップ:
private Step step() throws Exception {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setThreadNamePrefix("SThread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return stepBuilderFactory.get("queueToCassandraStep")
.<Message, Vendor>chunk(100)
.reader(itemReader)
.listener(new QueueReaderListener<>())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.taskExecutor(executor)
.build();
}
ウサギ設定:
import lombok.Setter;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties("art.com.service.product.config.rabbitmq")
public class RabbitConfig {
@Setter
private String host;
@Setter
private Integer port;
@Setter
private String username;
@Setter
private String password;
@Setter
private String exchangeName;
@Setter
private String queueName;
@Bean
ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory,
MessageConverter jsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setQueue(queueName);
rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setMessageConverter(jsonMessageConverter);
return rabbitTemplate;
}
}
他の設定やコードが役に立ったら教えてください。私もそれらを共有することができます。
あなたは素晴らしい苦情を書きましたが、誰かがあなたを助けるためには、少なくともいくつかの技術的な詳細を提供した方がいいかもしれません。これには、マシン構成、Rabbit構成、メッセージサイズ、およびその他の関連する詳細が含まれます。 – theMayer
@theMayer configsと少しのコードが追加されました。 – Wizard