2017-06-15 1 views
1

RabbitMQ-javaクライアントAPIを使用してRabbitMQサーバーと対話しようとしています。 私はjava client api guideから読み取る:経験則としてRabbitMQのシングルスレッドエグゼキュータとの通信にシングルチャネルを使用するのは良いですか?

を、スレッド間のチャネルインスタンスを共有することは避けるべきものです。アプリケーションは、複数のスレッドにわたって同じチャネルを共有するのではなく、スレッドごとにチャネルを使用するほうがよいでしょう。

corePoolSize 1のThreadPoolExecutorを使用し、RabbitMQキューのメッセージを保存するためのRunnableタスクを追加しようとしています。ここで私が使用しているコードは次のとおりです。

package common; 

import java.io.IOException; 
import java.io.InputStream; 
import java.security.KeyManagementException; 
import java.security.NoSuchAlgorithmException; 
import java.util.Properties; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.google.gson.JsonObject; 
import com.rabbitmq.client.BlockedListener; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.MessageProperties; 
import com.rabbitmq.client.ShutdownListener; 
import com.rabbitmq.client.ShutdownSignalException; 

public class RabbitMQUtil { 
    private static Logger log= LoggerFactory.getLogger("logger"); 
    private static RabbitMQUtil gmInstance; 
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000)); 
    private final String PROPERTIES_FILE_NAME = "config/rabbitmq.properties"; 
    private final Properties properties = new Properties(); 
    private String host = null; 
    private int port = 0; 
    private String username = null; 
    private String password = null; 
    private String useSSL = "false"; 
    private ConnectionFactory factory; 
    private Connection connection; 
    private Channel channel; 

    private RabbitMQUtil() throws IOException, TimeoutException, Exception { 
     try { 
      InputStream stream = RabbitMQUtil.class.getClassLoader().getResourceAsStream(PROPERTIES_FILE_NAME); 
      if(stream != null) { 
       properties.load(stream); 
      } 
     } catch (Exception ex) { 
      log.error("Exception while loading the rabbitmq properties file:", ex); 
     } 

     host = properties.getProperty("rabbitmq.host", "localhost"); 
     port = Integer.parseInt(properties.getProperty("rabbitmq.port", "5672")); 
     username = properties.getProperty("rabbitmq.username", "guest"); 
     password = properties.getProperty("rabbitmq.password", "guest"); 
     useSSL = properties.getProperty("rabbitmq.usessl", "false"); 

     factory = new ConnectionFactory(); 
     factory.setHost(host); 
     factory.setPort(port); 
     factory.setUsername(username); 
     factory.setPassword(password); 
     if("true".equalsIgnoreCase(useSSL)) { 
      try { 
       factory.useSslProtocol(); 
      } catch (KeyManagementException | NoSuchAlgorithmException e) { 
       log.error("Exception while applying the tls for rabbitmq:", e); 
      } 
     } 
     connection = factory.newConnection(); 
     connection.addBlockedListener(new RabbitMQBlockedListener()); 
     connection.addShutdownListener(new RabbitMQShutDownListener()); 

     channel = connection.createChannel(); 
    } 

    public static RabbitMQUtil getInstance() { 
     if(gmInstance == null) { 
      synchronized (RabbitMQUtil.class) { 
       if(gmInstance == null) { 
        try { 
         gmInstance = new RabbitMQUtil(); 
        } catch (IOException | TimeoutException e) { 
         log.error("Exception in getInstance:", e); 
        } catch (Exception e) { 
         log.error("Exception in getInstance:", e); 
        } 
       } 
      } 
     } 
     return gmInstance; 
    } 

    public static void saveErrorMessagesInLogs(JsonObject obj, String queueName) { 
     log.info("data to be saved for :"+queueName+" is:"+obj.toString()); 
    } 

    public void saveMsgInQueue(JsonObject gson, String queueName) { 
     this.executor.execute(new RabbitMQData(gson, queueName)); 
    } 

    private class RabbitMQBlockedListener implements BlockedListener { 
     @Override 
     public void handleBlocked(String arg0) throws IOException { 
      log.warn("blocked listener called:", arg0); 
     } 

     @Override 
     public void handleUnblocked() throws IOException { 
      log.warn("unblocked listener called:"); 
     } 
    } 

    private class RabbitMQShutDownListener implements ShutdownListener { 
     @Override 
     public void shutdownCompleted(ShutdownSignalException cause) { 
      log.error("Shutdown event listener called:", cause); 
      log.error("shutdown event listener:"+cause.isHardError()); 
     } 
    } 

    private class RabbitMQData implements Runnable{ 
     JsonObject obj; 
     String queueName; 
     public RabbitMQData(JsonObject obj, String queueName) { 
      Thread.currentThread().setName("RabbitMQ Thread:"+obj.get("userid")+" -->"+queueName); 
      this.obj = obj; 
      this.queueName = queueName; 
     } 

     @Override 
     public void run() { 
      try { 
       channel.queueDeclare(this.queueName, true, false, false, null); 
       channel.basicPublish("", this.queueName, MessageProperties.PERSISTENT_BASIC, this.obj.toString().getBytes()); 
      } catch (Exception e) { 
       log.info("Error while running the scheduled rabbitmq task:", e); 
       log.info("data to be saved for :"+this.queueName+" is:"+this.obj.toString()); 
      } 
     } 
    } 

    public static void saveRabbitMQData(JsonObject obj, String queueName) { 
     RabbitMQUtil util = RabbitMQUtil.getInstance(); 
     if(util != null) 
      util.saveMsgInQueue(obj, queueName); 
     else 
      RabbitMQUtil.saveErrorMessagesInLogs(obj, queueName); 
    } 
} 

私は、次のことを知っていただきたいと思います:

  1. はわずか1つのスレッドのスレッドプールを使用する場合、単一チャネルを使用して結構ですか?
  2. ブロックされた/ブロックされていないシャットダウンイベントが発生したときに、接続オブジェクトとチャネルオブジェクトをどのように処理する必要がありますか? RabbitMQサーバーが再び起動するとAPIは自動的に接続を確立しますが、

他のフィードバックは高く評価されます。

答えて

1

1.-ありがとうですのみ1スレッドのスレッドプールを使用する場合、単一チャネルを使用して結構ですか?

はい、問題ありません。それはあなたがそれをやるべき方法です。 1つのスレッドのみがChannelインスタンスを使用する必要があります。ブロック解除/ブロックされたとき

2.-どのように接続し、チャネルオブジェクトを処理する必要があり、シャットダウンイベントがトリガされます。それ以外の場合は、確認が失われる可能性があります(https://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.1/rabbitmq-java-client-javadoc-3.1.1/com/rabbitmq/client/Channel.htmlここを参照してください)? RabbitMQサーバーが再び起動するとAPIは自動的に接続を確立しますが、

アプリケーションがシャットダウンしているときは、チャネルを閉じてからRabbitMQへの接続を閉じる必要があります。

ブロック解除/ブロックについて
channel.close(); 
    conn.close(); 

、ここ(https://www.rabbitmq.com/api-guide.html)をお読みください:

コールバックのチャンネルをインスタンス化したスレッドとは別のスレッド・プールに派遣されている消費者に。これは、コンシューマーが、Connection#ChannelDeclareやChannel#basicCancelなど、ConnectionまたはChannel上のブロッキングメソッドを安全に呼び出すことができることを意味します。

各チャネルには独自のディスパッチスレッドがあります。チャネル当たり1つのコンシューマーの最も一般的な使用例については、これは、コンシューマーが他のコンシューマーを押さえていないことを意味します。チャネルごとに複数のコンシューマーがいる場合は、長期実行コンシューマーがそのチャネル上の他のコンシューマーへのコールバックのディスパッチを保留する可能性があることに注意してください。

+0

ありがとうございました。 私は、shutdownlistenerでconnection.abort()とchannel.abort()を使用しなければなりませんでした。なぜなら、クローズによって例外が発生し、同じリスナー内でconnection = nullおよびchannel = nullが発生したためです。私は新しい接続を作成しており、チャンネルはrabbitmqサーバーにメッセージを送信する前にnullです。 ブロックされたリスナーで同じ戦略を使用することをお勧めしますか? – Prateek

関連する問題