RabbitMQ-javaクライアントAPIを使用してRabbitMQサーバーと対話しようとしています。 私はjava client api guideから読み取る:経験則としてRabbitMQのシングルスレッドエグゼキュータとの通信にシングルチャネルを使用するのは良いですか?
を、スレッド間のチャネルインスタンスを共有することは避けるべきものです。アプリケーションは、複数のスレッドにわたって同じチャネルを共有するのではなく、スレッドごとにチャネルを使用するほうがよいでしょう。
corePoolSize 1のThreadPoolExecutorを使用し、RabbitMQキューのメッセージを保存するためのRunnableタスクを追加しようとしています。ここで私が使用しているコードは次のとおりです。
package common;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
public class RabbitMQUtil {
private static Logger log= LoggerFactory.getLogger("logger");
private static RabbitMQUtil gmInstance;
private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000));
private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties";
private final Properties properties = new Properties();
private String host = null;
private int port = 0;
private String username = null;
private String password = null;
private String useSSL = "false";
private ConnectionFactory factory;
private Connection connection;
private Channel channel;
private RabbitMQUtil() throws IOException, TimeoutException, Exception {
try {
InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME);
if(stream != null) {
properties.load(stream);
}
} catch (Exception ex) {
log.error("Exception while loading the rabbitmq properties file:", ex);
}
host = properties.getProperty("rabbitmq.host", "localhost");
port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672"));
username = properties.getProperty("rabbitmq.username", "guest");
password = properties.getProperty("rabbitmq.password", "guest");
useSSL = properties.getProperty("rabbitmq.usessl", "false");
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
if("true".equalsIgnoreCase(useSSL)) {
try {
factory.useSslProtocol();
} catch (KeyManagementException | NoSuchAlgorithmException e) {
log.error("Exception while applying the tls for rabbitmq:", e);
}
}
connection = factory.newConnection();
connection.addBlockedListener(new RabbitMQBlockedListener());
connection.addShutdownListener(new RabbitMQShutDownListener());
channel = connection.createChannel();
}
public static RabbitMQUtil getInstance() {
if(gmInstance == null) {
synchronized (RabbitMQUtil.class) {
if(gmInstance == null) {
try {
gmInstance = new RabbitMQUtil();
} catch (IOException | TimeoutException e) {
log.error("Exception in getInstance:", e);
} catch (Exception e) {
log.error("Exception in getInstance:", e);
}
}
}
}
return gmInstance;
}
public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) {
log.info("data to be saved for :"+queueName+" is:"+obj.toString());
}
public void saveMsgInQueue(JsonObject gson, String queueName) {
this.executor.execute(new RabbitMQData(gson, queueName));
}
private class RabbitMQBlockedListener implements BlockedListener {
@Override
public void handleBlocked(String arg0) throws IOException {
log.warn("blocked listener called:", arg0);
}
@Override
public void handleUnblocked() throws IOException {
log.warn("unblocked listener called:");
}
}
private class RabbitMQShutDownListener implements ShutdownListener {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
log.error("Shutdown event listener called:", cause);
log.error("shutdown event listener:"+cause.isHardError());
}
}
private class RabbitMQData implements Runnable{
JsonObject obj;
String queueName;
public RabbitMQData(JsonObject obj, String queueName) {
Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName);
this.obj = obj;
this.queueName = queueName;
}
@Override
public void run() {
try {
channel.queueDeclare(this.queueName, true, false, false, null);
channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes());
} catch (Exception e) {
log.info("Error while running the scheduled rabbitmq task:", e);
log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString());
}
}
}
public static void saveRabbitMQData(JsonObject obj, String queueName) {
RabbitMQUtil util = RabbitMQUtil.getInstance();
if(util != null)
util.saveMsgInQueue(obj, queueName);
else
RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName);
}
}
私は、次のことを知っていただきたいと思います:
- はわずか1つのスレッドのスレッドプールを使用する場合、単一チャネルを使用して結構ですか?
- ブロックされた/ブロックされていないシャットダウンイベントが発生したときに、接続オブジェクトとチャネルオブジェクトをどのように処理する必要がありますか? RabbitMQサーバーが再び起動するとAPIは自動的に接続を確立しますが、
他のフィードバックは高く評価されます。
は
ありがとうございました。 私は、shutdownlistenerでconnection.abort()とchannel.abort()を使用しなければなりませんでした。なぜなら、クローズによって例外が発生し、同じリスナー内でconnection = nullおよびchannel = nullが発生したためです。私は新しい接続を作成しており、チャンネルはrabbitmqサーバーにメッセージを送信する前にnullです。 ブロックされたリスナーで同じ戦略を使用することをお勧めしますか? – Prateek