次の設定があります。 APPID(接続されたクライアントあたり)と呼ばれる一意の識別子に基づいて、メッセージキュー(私の場合はLinkedBlockingQueues)の設定された数を越えインバウンド・クライアントメッセージを拡散メッセージ分配は、あります:私も答える必要とnettyチャネルをキューに渡し、後で別のスレッドの書き込みに使用することは有効ですか?
public class MessageDistributor {
private final List<BlockingQueue<MessageWrapper>> messageQueueBuckets;
public MessageDistributor(List<BlockingQueue<MessageWrapper>> messageQueueBuckets) {
this.messageQueueBuckets = messageQueueBuckets;
}
public void handle(String appId, MessageWrapper message) {
int index = (messageQueueBuckets.size() - 1) % hash(appId);
try {
messageQueueBuckets.get(index).offer(message);
} catch (Exception e) {
// handle exception
}
}
}
後でメッセージ、私はMessageWrapper内のメッセージオブジェクトとネッティーチャンネルをラップ:
public class MessageWrapper {
private final Channel channel;
private final Message message;
public MessageWrapper(Channel channel, Message message) {
this.channel = channel;
this.message = message;
}
public Channel getChannel() {
return channel;
}
public Message getMessage() {
return message;
}
}
さらに、Runnableを実装し、割り当てられたブロックキューから新しいメッセージを受け取り、メッセージコンシューマは、そこにあります。この男は、私はあまりにも多くの他の接続しているクライアントのための操作をブロックしてはならないメインネッティーイベントループの外に持っていると思いますし、いくつかの高価な/ブロック操作、複数のキューの故に使用を実行します。
public class MessageConsumer implements Runnable {
private final BlockingQueue<MessageWrapper> messageQueue;
public MessageConsumer(BlockingQueue<MessageWrapper> messageQueue) {
this.messageQueue = messageQueue;
}
@Override
public void run() {
while (true) {
try {
MessageWrapper msgWrap = messageQueue.take();
Channel channel = msgWrap.getChannel();
Message msg = msgWrap.getMessage();
doSthExepnsiveOrBlocking(channel, msg);
} catch (Exception e) {
// handle exception
}
}
}
public void doSthExepnsiveOrBlocking(Channel channel, Message msg) {
// some expsive/blocking operations
channe.writeAndFlush(someResultObj);
}
}
のセットアップ(messageExecutorが8の大きさとDefaultEventeExecutorGroup
である)以下のように、すべてのクラスになります。このアプローチの
int nrOfWorkers = config.getNumberOfClientMessageQueues();
List<BlockingQueue<MessageWrapper>> messageQueueBuckets = new ArrayList<>(nrOfWorkers);
for (int i = 0; i < nrOfWorkers; i++) {
messageQueueBuckets.add(new LinkedBlockingQueue<>());
}
MessageDistributor distributor = new MessageDistributor(messageQueueBuckets);
List<MessageConsumer> consumers = new ArrayList<>(nrOfWorkers);
for (BlockingQueue<MessageWrapper> messageQueueBucket : messageQueueBuckets) {
MessageConsumer consumer = new MessageConsumer(messageQueueBucket);
consumers.add(consumer);
messageExecutor.submit(consumer);
}
私の目標は(ない完全に、しかし少なくともビット)互いに接続されたクライアントを隔離することであり、異なるスレッドで高価な操作を実行することもできます。
私の質問は次のとおりです。このMessageWrapper
の内部にnettyチャネルオブジェクトをラップして、後で使用し、他のスレッドで書き込みメソッドにアクセスすることはできますか?
UPDATE
代わりに網状の上に追加のメッセージ配信の仕組みを構築する、私は単に私のチャネル遮断ハンドラの別々のEventExecutorGroupで行くと、それがどのように動作するかを見ることにしました。