2011-07-22 13 views
1

簡単な例のように、Nettyで2つのワーカースレッドのみを使用して3つの同時TCPクライアント接続を処理したいとします。Jboss Netty - 3つのワーカースレッドを使用して2つの接続を提供する方法

質問 A) 以下のコードでは、私の3番目の接続はサーバーからデータを取得しません。接続はそこにあります。通知 - 私のワーカーエグゼキュータとワーカー数が2である方法。 2つのワーカースレッドと3つの接続がある場合、3つの接続すべてが2つのスレッドによって処理されるべきではありませんか?

B) もう1つの質問です。 - Nettyはjava.util.concurrentのCompletionServiceを使用していますか?それはそれを使用していないようです。また、executor.submitやfuture.getを実行するソースコードは表示されませんでした。 これはすべて、ワーカースレッドよりも多くの接続にデータを処理して提供する方法の混乱を招いています。

C) Nettyが10000+の同時TCP接続を処理する方法が失われています...それは10000スレッドを作成しますか?接続ごとのスレッドはスケーラブルな解決策ではないので、私のテストコードがどうやって期待どおりに動くかは分かりません。

import java.net.InetSocketAddress; 
    import java.nio.channels.ClosedChannelException; 
    import java.util.Date; 
    import java.util.concurrent.Executors; 
    import java.util.logging.Level; 
    import java.util.logging.Logger; 

    import org.jboss.netty.bootstrap.ServerBootstrap; 
    import org.jboss.netty.channel.Channel; 
    import org.jboss.netty.channel.ChannelFuture; 
    import org.jboss.netty.channel.ChannelFutureListener; 
    import org.jboss.netty.channel.ChannelHandlerContext; 
    import org.jboss.netty.channel.ChannelPipeline; 
    import org.jboss.netty.channel.ChannelPipelineFactory; 
    import org.jboss.netty.channel.ChannelStateEvent; 
    import org.jboss.netty.channel.Channels; 
    import org.jboss.netty.channel.ExceptionEvent; 
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 
    import org.jboss.netty.handler.codec.string.StringEncoder; 

    public class SRNGServer { 

     public static void main(String[] args) throws Exception { 
      // Configure the server. 
      ServerBootstrap bootstrap = new ServerBootstrap(
        new NioServerSocketChannelFactory(
          Executors.newCachedThreadPool(), 
          //Executors.newCachedThreadPool() 
          Executors.newFixedThreadPool(2),2 
         )); 

      // Configure the pipeline factory. 
      bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP()); 

      // Bind and start to accept incoming connections. 
      bootstrap.bind(new InetSocketAddress(8080)); 
     } 



     private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler { 

     private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName()); 


     @Override 
     public void channelConnected(
       ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 

      // Send greeting for a new connection. 
      Channel ch=e.getChannel(); 

      System.out.printf("channelConnected with channel=[%s]%n", ch); 

      ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n"); 

      SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener(); 

      System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture); 

      writeFuture.addListener(srngcfl);  

     } 

     @Override 
     public void exceptionCaught(
       ChannelHandlerContext ctx, ExceptionEvent e) { 

      logger.log(
        Level.WARNING, 
        "Unexpected exception from downstream.", 
        e.getCause()); 
      if(e.getCause() instanceof ClosedChannelException){ 
       logger.log(Level.INFO, "****** Connection closed by client - Closing Channel"); 
      } 
      e.getChannel().close(); 
     } 
     } 



     private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory { 

     public ChannelPipeline getPipeline() throws Exception { 

      // Create a default pipeline implementation. 
      ChannelPipeline pipeline = Channels.pipeline(); 

      pipeline.addLast("encoder", new StringEncoder()); 
      pipeline.addLast("handler", new SRNGServerHandlerP()); 

      return pipeline; 
     } 
     } 


     private static class SRNGChannelFutureListener implements ChannelFutureListener{ 

     public void operationComplete(ChannelFuture future) throws InterruptedException{ 
      Thread.sleep(1000*5); 
      Channel ch=future.getChannel(); 
      if(ch!=null && ch.isConnected()){ 
       ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n"); 
       //-- Add this instance as listener itself. 
       writeFuture.addListener(this); 
      } 

     } 

     } 
    } 

答えて

4

ソースコードを詳細に分析していないため、正しく動作しない理由はわかりません。しかしSRNGChannelFutureListenerにこの行は疑わしい:

Thread.sleep(1000*5); 

これは、それが5秒間ロックされて実行するスレッドを行います。その時間中にスレッドは他の処理を行うことができません。

質問C:いいえ、10,000スレッドを作成しません。 Nettyの全体のポイントは、それが実際にはうまくスケールされないので、それをしないということです。代わりに、スレッドプールからの限られた数のスレッドを使用し、何かが発生したときにイベントを生成し、プール内のスレッドでイベントハンドラを実行します。したがって、スレッドと接続は互いに分離されます(接続ごとにスレッドはありません)。

このメカニズムを適切に機能させるには、イベントハンドラができるだけ早く戻って、実行するスレッドをできるだけ早く次のイベントハンドラを実行できるようにする必要があります。スレッドを5秒間スリープさせると、スレッドを割り当てたままにしておき、他のイベントの処理には使用できなくなります。

質問B:本当にあなたがNettyにソースコードを入手して調べることができるかどうか知りたいのであれば。それはセレクタと他のjava.nioクラスを使用してasynchronous I/Oを実行します。

+0

それは私が.NioServerSocketChannelFactoryに別個NioServerSocketChannelFactory constructors.Readで試験をdidntのW/O sleep.Earlier働い2 constrs NioServerSocketChannelFactory(エグゼキュータbossExecutor、執行workerExecutor)AND NioServerSocketChannelFactory(エグゼキュータbossExecutor、執行workerExecutor、INT workerCount).MY M /を有します私は最初のconstr、workerExecutor = 2を使用すると、私のworkerExecutorは 2もデータを取得しません。しかし、2番目のconstrを使用すると、 workerExecutor = 2とworkCount = 2で、すべてのconnがデータを取得するようになりました。 – FatherFigure

+0

@AmitAlkaがReactorパターンを読んでいる:http://ja.wikipedia.org/wiki/Reactor_pattern – Migol

関連する問題