2017-07-06 3 views
0

問題の説明:ネッティー(ネッティー-すべて-4.1.12.Final.jar)にjava.io.IOException:既存の接続はリモートホストに強制的に切断された

私のプログラムはネッティーサーバとクライアントを作成しますそのサーバーに2^17の接続を行い、クライアントがこの例外の受信を開始することがあります。

java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta

英語と同等です:

java.io.IOException: An existing connection was forcibly closed by the remote host

もちろんサーバが強制的に既存の接続を閉じることが望まれていません。

再現手順:私はそれを再生し、この「単一の実行可能なjavaファイル」プログラムを作成しました。この問題を再現するために喜んで誰の便宜上、それだけでnetty-all-4.1.12.Final.jar依存性を必要とする

。これは、いくつかの空きポートでnettyサーバーを起動し、クライアントを作成し、要求を実行し、要求を処理するチャンスを与えるために少し待ってから、接続の数に関する統計情報を出力します。 サーバが遭遇した回数、および何種類の例外が発生したかクライアントが発生しました。

package netty.exception.tst; 

import java.io.PrintWriter; 
import java.io.StringWriter; 
import java.net.InetSocketAddress; 
import java.util.Collections; 
import java.util.Map; 
import java.util.Map.Entry; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.Collectors; 

import io.netty.bootstrap.Bootstrap; 
import io.netty.bootstrap.ServerBootstrap; 
import io.netty.buffer.ByteBuf; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 

public class NettyException { 

    public static void main(String[] args) throws InterruptedException { 
     System.out.println("starting server"); 
     NettyServer server = new NettyServer(0); 
     int port = server.getPort(); 
     System.out.println("server started at port: " + port); 

     System.out.println("staring client"); 
     NettyClient client = new NettyClient(); 
     System.out.println("client started"); 

     int noOfConnectionsToPerform = 1 << 17; 
     System.out.println("performing " + noOfConnectionsToPerform + " connections"); 
     for (int n = 0; n < noOfConnectionsToPerform; n++) { 
      // send a request 
      ChannelFuture f = client.getBootstrap().connect("localhost", port); 
     } 
     System.out.println("client performed " + noOfConnectionsToPerform + " connections"); 

     System.out.println("wait a bit to give a chance for server to finish processing incoming requests"); 
     Thread.currentThread().sleep(80000); 

     System.out.println("shutting down server and client"); 
     server.stop(); 
     client.stop(); 

     System.out.println("stopped, server received: " + server.connectionsCount() + " connections"); 
     int numberOfLostConnections = noOfConnectionsToPerform - server.connectionsCount(); 
     if (numberOfLostConnections > 0) { 
      System.out.println("Where do we lost " + numberOfLostConnections + " connections?"); 
     } 

     System.out.println("srerver exceptions: "); 
     printExceptions(server.getExceptions()); 
     System.out.println("client exceptions: "); 
     printExceptions(client.getExceptions()); 
    } 

    private static void printExceptions(Map<String, Integer> exceptions) { 
     if (exceptions.isEmpty()) { 
      System.out.println("There was no exceptions"); 
     } 
     for (Entry<String, Integer> exception : exceptions.entrySet()) { 
      System.out.println("There was " + exception.getValue() + " times this exception:"); 
      System.out.println(exception.getKey()); 
     } 
    } 

    public static class NettyServer { 
     private ChannelFuture channelFuture; 
     private EventLoopGroup bossGroup; 
     private EventLoopGroup workerGroup; 
     private AtomicInteger connections = new AtomicInteger(0); 
     private ExceptionCounter exceptionCounter = new ExceptionCounter(); 

     public NettyServer(int port) throws InterruptedException { 
      bossGroup = new NioEventLoopGroup(); 
      workerGroup = new NioEventLoopGroup(); 
      ServerBootstrap serverBootstrap = new ServerBootstrap(); 
      serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 
        .childHandler(new ChannelInitializer<SocketChannel>() { 
         @Override 
         public void initChannel(SocketChannel ch) throws Exception { 
          ch.pipeline().addLast(new TimeServerHandler() { 

           @Override 
           public void channelActive(final ChannelHandlerContext ctx) { 
            connections.incrementAndGet(); 
            super.channelActive(ctx); 
           } 

           @Override 
           public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
            exceptionCounter.countException(cause); 
            super.exceptionCaught(ctx, cause); 
           } 

          }); 
         } 
        }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); 
      channelFuture = serverBootstrap.bind(port).sync(); 
     } 

     public int getPort() { 
      return ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); 
     } 

     public int connectionsCount() { 
      return connections.get(); 
     } 

     public Map<String, Integer> getExceptions() { 
      return exceptionCounter.getExceptions(); 
     } 

     public void stop() { 
      bossGroup.shutdownGracefully(); 
      workerGroup.shutdownGracefully(); 
      try { 
       bossGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
       workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public static class NettyClient { 
     private Bootstrap bootstrap; 
     private EventLoopGroup workerGroup; 
     private ExceptionCounter exceptionCounter = new ExceptionCounter(); 

     public NettyClient() { 
      workerGroup = new NioEventLoopGroup(); 

      bootstrap = new Bootstrap(); 
      bootstrap.group(workerGroup); 
      bootstrap.channel(NioSocketChannel.class); 
      bootstrap.option(ChannelOption.SO_KEEPALIVE, true); 
      bootstrap.handler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
        ch.pipeline().addLast(new TimeClientHandler() { 
         @Override 
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
          exceptionCounter.countException(cause); 
          super.exceptionCaught(ctx, cause); 
         } 
        }); 
       } 
      }); 
     } 

     public Bootstrap getBootstrap() { 
      return bootstrap; 
     } 

     public void stop() { 
      workerGroup.shutdownGracefully(); 
      try { 
       workerGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 

     public Map<String, Integer> getExceptions() { 
      return exceptionCounter.getExceptions(); 
     } 
    } 

    public static class TimeServerHandler extends ChannelInboundHandlerAdapter { 

     @Override 
     public void channelActive(final ChannelHandlerContext ctx) { 
      final ByteBuf time = ctx.alloc().buffer(4); 
      time.writeInt((int) (System.currentTimeMillis()/1000L + 2208988800L)); 

      final ChannelFuture f = ctx.writeAndFlush(time); 
      f.addListener(new ChannelFutureListener() { 
       @Override 
       public void operationComplete(ChannelFuture future) { 
        assert f == future; 
        ctx.close(); 
       } 
      }); 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
      ctx.close(); 
     } 
    } 

    public static class TimeClientHandler extends ChannelInboundHandlerAdapter { 
     private ThreadLocal<ByteBuf> buf = new ThreadLocal<ByteBuf>(); 

     @Override 
     public void handlerAdded(ChannelHandlerContext ctx) { 
      buf.set(ctx.alloc().buffer(4)); 
     } 

     @Override 
     public void handlerRemoved(ChannelHandlerContext ctx) { 
      buf.get().release(); 
      buf.remove(); 
     } 

     @Override 
     public void channelRead(ChannelHandlerContext ctx, Object msg) { 
      ByteBuf m = (ByteBuf) msg; 
      buf.get().writeBytes(m); 
      m.release(); 

      if (buf.get().readableBytes() >= 4) { 
       long currentTimeMillis = (buf.get().readUnsignedInt() - 2208988800L) * 1000L; 
       ctx.close(); 
      } 
     } 

     @Override 
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
      ctx.close(); 
     } 
    } 

    public static class ExceptionCounter { 
     private ConcurrentHashMap<String, AtomicInteger> exceptions = new ConcurrentHashMap<String, AtomicInteger>(); 

     private void countException(Throwable cause) { 

      StringWriter writer = new StringWriter(); 
      cause.printStackTrace(new PrintWriter(writer)); 
      String stackTrace = writer.toString(); 

      AtomicInteger exceptionCount = exceptions.get(stackTrace); 
      if (exceptionCount == null) { 
       exceptionCount = new AtomicInteger(0); 
       AtomicInteger prevCount = exceptions.putIfAbsent(stackTrace, exceptionCount); 
       if (prevCount != null) { 
        exceptionCount = prevCount; 
       } 
      } 
      exceptionCount.incrementAndGet(); 
     } 

     public Map<String, Integer> getExceptions() { 
      Map<String, Integer> newMap = exceptions.entrySet().stream() 
        .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().get())); 
      return Collections.unmodifiableMap(newMap); 
     } 
    } 
} 

出力は次のようになります。

starting server 
server started at port: 56069 
staring client 
client started 
performing 131072 connections 
client performed 131072 connections 
wait a bit to give a chance for server to finish processing incoming requests 
shutting down server and client 
stopped, server received: 34735 connections 
Where do we lost 96337 connections? 
srerver exceptions: 
There was no exceptions 
client exceptions: 
There was 258 times this exception: 
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) 
    at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:813) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 
    at java.lang.Thread.run(Thread.java:745) 

There was 30312 times this exception: 
java.io.IOException: Istniejące połączenie zostało gwałtownie zamknięte przez zdalnego hosta 
    at sun.nio.ch.SocketDispatcher.read0(Native Method) 
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
    at sun.nio.ch.IOUtil.read(IOUtil.java:192) 
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) 
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100) 
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) 
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) 
    at java.lang.Thread.run(Thread.java:745) 

質問:

  • この例外がスローされるのはなぜ?
  • 接続が失われた場所はどこですか?なぜ彼らにエラーがないのですか?
  • これを避けるには、この種の「ハイスループット」アプリケーションをプログラムする正しい方法は、既存の接続を失う/壊すような問題がないようにすることですか?
  • いいえNettyの専門家が知っていると思います:TimeClientHandlerのフィールド宣言が静的になるように変更すると、TimeClientHandler.handlerRemovedにnullポインタ例外がありますか?これは非常に奇妙です、このクラスは何とか複製されますか?またはNioEventLoopGroupのスレッドが何とか変なのですか?

環境:

  • 網状バージョン:網状-全4.1.12.Final.jar
  • JVMバージョン: jdk1.8.0_111 64ビット
  • OSバージョン: Windows 10 64ビット

答えて

1

IPアドレスごとに64kポートの制限があるため、2^17ポートを開くことはできません。各ソケットはファイルハンドルを使用しているので、プロセスごとの最大オープンファイルの制限を打つかもしれません。 "Max open files" for working processを参照してください。

+0

確かに、それは物語の一部かもしれませんが、なぜそれが既存の接続を終了させて​​いるのですか、そして限界を超えることについて何の誤りもないのですか?私がしようとしているのは、限界を超えて、そのような「トラフィックの多い」アプリケーションをどのようにプログラムするかを学ぶときに、何が起こるのかを過小評価することです。 –

関連する問題