2017-12-11 38 views
0

次の設定があります。 APPID(接続されたクライアントあたり)と呼ばれる一意の識別子に基づいて、メッセージキュー(私の場合はLinkedBlockingQueues)の設定された数を越えインバウンド・クライアントメッセージを拡散メッセージ分配は、あります:私も答える必要とnettyチャネルをキューに渡し、後で別のスレッドの書き込みに使用することは有効ですか?

public class MessageDistributor { 

    private final List<BlockingQueue<MessageWrapper>> messageQueueBuckets; 

    public MessageDistributor(List<BlockingQueue<MessageWrapper>> messageQueueBuckets) { 
     this.messageQueueBuckets = messageQueueBuckets; 
    } 

    public void handle(String appId, MessageWrapper message) { 
     int index = (messageQueueBuckets.size() - 1) % hash(appId); 
     try { 
      messageQueueBuckets.get(index).offer(message); 
     } catch (Exception e) { 
      // handle exception 
     } 
    } 
} 

後でメッセージ、私はMessageWrapper内のメッセージオブジェクトとネッティーチャンネルをラップ:

public class MessageWrapper { 

    private final Channel channel; 
    private final Message message; 

    public MessageWrapper(Channel channel, Message message) { 
     this.channel = channel; 
     this.message = message; 
    } 

    public Channel getChannel() { 
     return channel; 
    } 

    public Message getMessage() { 
     return message; 
    } 
} 

さらに、Runnableを実装し、割り当てられたブロックキューから新しいメッセージを受け取り、メッセージコンシューマは、そこにあります。この男は、私はあまりにも多くの他の接続しているクライアントのための操作をブロックしてはならないメインネッティーイベントループの外に持っていると思いますし、いくつかの高価な/ブロック操作、複数のキューの故に使用を実行します。

public class MessageConsumer implements Runnable { 

    private final BlockingQueue<MessageWrapper> messageQueue; 

    public MessageConsumer(BlockingQueue<MessageWrapper> messageQueue) { 
     this.messageQueue = messageQueue; 
    } 

    @Override 
    public void run() { 
     while (true) { 
      try { 
       MessageWrapper msgWrap = messageQueue.take(); 
       Channel channel = msgWrap.getChannel(); 
       Message msg = msgWrap.getMessage(); 

       doSthExepnsiveOrBlocking(channel, msg);  

      } catch (Exception e) { 
       // handle exception 
      } 
     } 
    } 
    public void doSthExepnsiveOrBlocking(Channel channel, Message msg) { 
     // some expsive/blocking operations 
     channe.writeAndFlush(someResultObj); 
    } 
} 

のセットアップ(messageExecutorが8の大きさとDefaultEventeExecutorGroupである)以下のように、すべてのクラスになります。このアプローチの

int nrOfWorkers = config.getNumberOfClientMessageQueues(); 
List<BlockingQueue<MessageWrapper>> messageQueueBuckets = new ArrayList<>(nrOfWorkers); 

for (int i = 0; i < nrOfWorkers; i++) { 
    messageQueueBuckets.add(new LinkedBlockingQueue<>()); 
} 
MessageDistributor distributor = new MessageDistributor(messageQueueBuckets); 
List<MessageConsumer> consumers = new ArrayList<>(nrOfWorkers); 

for (BlockingQueue<MessageWrapper> messageQueueBucket : messageQueueBuckets) { 
    MessageConsumer consumer = new MessageConsumer(messageQueueBucket); 
    consumers.add(consumer); 
    messageExecutor.submit(consumer); 
} 

私の目標は(ない完全に、しかし少なくともビット)互いに接続されたクライアントを隔離することであり、異なるスレッドで高価な操作を実行することもできます。

私の質問は次のとおりです。このMessageWrapperの内部にnettyチャネルオブジェクトをラップして、後で使用し、他のスレッドで書き込みメソッドにアクセスすることはできますか?

UPDATE

代わりに網状の上に追加のメッセージ配信の仕組みを構築する、私は単に私のチャネル遮断ハンドラの別々のEventExecutorGroupで行くと、それがどのように動作するかを見ることにしました。

答えて

1

はい他のスレッドからChannel.*メソッドを呼び出すことは有効です。それは、Channelに属するEventLoopスレッド自体からこれらのメソッドが呼び出されたときにメソッドが最もよく機能すると言いました。

関連する問題