2017-02-08 9 views
1

Netty v4を使用してTCPサーバーを作成しています。サーバーはクライアントからの複数の接続を処理し、それらにデータをストリームします。Nettyサーバー - 遅いコンシューマーのTCPで

クライアントが低速でデータを消費していることを検出できるようにしたいと考えています。私は基本的にクライアントが遅いためにTCPバッファがいっぱいにならないようにしたい!

これは基本的にZeroMQの機能です(「低速加入者検出(自殺カタツムリパターン)」と呼ばれています)。 Nettyを使ってどうすればいいですか?

私の現在のコードは、(私は、サーバーのセットアップを単に紹介します)です:

 ServerBootstrap b = new ServerBootstrap(); 
     b.group(bossGroup, workerGroup) 
      .channel(NioServerSocketChannel.class) 
      .option(ChannelOption.SO_BACKLOG, 1000) 
      .handler(new LoggingHandler(LogLevel.INFO)) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
      @Override 
      public void initChannel(SocketChannel ch) throws Exception { 
       ChannelPipeline p = ch.pipeline(); 
       p.addLast(new Handler()); 
      } 
      }); 

     ChannelFuture f = b.bind(8000).sync(); 
     f.channel().closeFuture().sync(); 

は義和SO_BACKLOGオプションがないことですか?それは接続がキューに入れられていることを示していますが、特定の接続に対してキューに入れられるパケットに興味があります。

答えて

0

これを行う方法は、WriteBufferWaterMarkを使用することです。 Javadocから引用:

WriteBufferWaterMarkは、ライトバッファのための低ウォーターマークと高ウォーターマーク を設定するために使用されています。書き込み バッファにキューイングされたバイト数が最高水準点を超えると、Channel.isWritable()は falseに戻ります。書き込みバッファーにキューイングされたバイト数が最高水準点を超えてから、水位が低い水位 に下がった場合、Channel.isWritable()は再び真を返すようになります。

子供のチャンネルconfigsに適切なウォーターマークを設定すると、遅い消費者は、「長い」時間の間書き込みができないチャネルを、うっとりする消費者に対してマークします。したがって、チャネルの書き込み可能状態の変化をリッスンし、チャネルが書き込み不能な時間を追跡すると、低速のコンシューマとバックログの相対的な重大度を識別し、低速がある閾値に達した場合にクライアントを切断できます。

+0

ウォーターマークを変更しなくても 'Channel.isWritable()'を使うことができます。彼らはデフォルト値を持っています。'Selector'はソケット送信バッファがいっぱいになるとOP_WRITEを報告しなくなります。 – EJP

+0

@Nicholas大変ありがとうございます。非常に単純なバージョンは次のようなものです: 'if(!ctx.channel()。isWritable())ctx.channel()。close();'それは安全ですか? – Will

+0

@Nicholasあなたは私の最後の質問に答えなかったが、私はまだあなたの答えが役に立つと思う:Pありがとう!私はそれを受け入れるよ! – Will

0

SO_BACKLOGオプションは何ですか?

SO_BACKLOGオプションは何ですか?

それは

正しいをキューに登録されている接続のためだと言います。

が、私はあなたがSO_BACKLOGに興味を持っていない特定の接続

のためにキューイングされるパケットに興味があります。

+0

私は彼らがあまりにも遅いです(切断)クライアントを遮断するためのオプションに興味がありますTCPバッファからメッセージを消費します。それは理にかなっていますか? – Will

0

はケースで、あなたの唯一の目標は、このアプローチは十分だろうバッファオーバーフロー(背圧)を避けるためです:

if (ctx.channel().isWritable()) { 
    ctx.writeAndFlush(msg); 
} 
関連する問題