2016-07-15 10 views
0

データベース(Redis)から定期的にデータを読み取り、適切なクライアントに送信するTCPサーバーを作成しようとしています。NettyとScheduled Executorサービス

しかし、私はかなりNettyに慣れていないので、私はこれをどのようにスケジュールすることができるのか分かりません。私は、私はこのようなスケジュールエグゼキュータ・サービスを使用する必要があることを知っていますか:私は、サーバーコードでそれを置くしようとしたときに

ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
e.scheduleAtFixedRate(() -> { 
    System.out.println("Calling..."); 
    // Do something 
}, 1, 1, TimeUnit.SECONDS); 

しかし、それは一度だけメソッドを呼び出しています。私はそれを別の場所に置こうとしましたが、それでも正しいことはできないようです。私は何をすべきか?

はここでサーバーのコードです:

package com.example.test.app; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Server { 

    public static void main(String[] args) throws Exception 
    { 
     EventLoopGroup bossGroup = new NioEventLoopGroup(); 
     EventLoopGroup workerGroup = new NioEventLoopGroup(); 

     final ServerHandler handler = new ServerHandler(); 

     try { 

      ServerBootstrap b = new ServerBootstrap(); 
      b.group(bossGroup, workerGroup); 
      b.channel(NioServerSocketChannel.class); 
      b.childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       protected void initChannel(SocketChannel ch) throws Exception 
       { 
        ch.pipeline().addLast(handler); 
       } 

      }); 
      b.option(ChannelOption.SO_BACKLOG, 128); 
      b.childOption(ChannelOption.SO_KEEPALIVE, true); 

      ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
      e.scheduleAtFixedRate(() -> { 
       System.out.println("Calling..."); 
       handler.saySomething(); 
      }, 1, 1, TimeUnit.SECONDS); 

      ChannelFuture f = b.bind(1337).sync(); 
      f.channel().closeFuture().sync(); 

     } finally { 
      workerGroup.shutdownGracefully(); 
      bossGroup.shutdownGracefully(); 
     } 
    } 

} 

そしてここで、サーバハンドラです:

package com.example.test.app; 

import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

    private ChannelHandlerContext ctx; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) 
    { 
     this.ctx = ctx; 
     System.out.println("Someone's connedted!"); 
    } 

    public void saySomething() 
    { 
     final ChannelFuture f = ctx.writeAndFlush("Sup!"); 
     f.addListener((ChannelFutureListener) (ChannelFuture future) -> { 
      System.out.println("Something has been said!"); 
     }); 
    } 

} 

答えて

1

ctxがnullながらfinal ChannelFuture f = ctx.writeAndFlush("Sup!");を呼び出すためのNullPointerExceptionを生成saySomething()方法が。 EventExecutorGroup.scheduleAtFixedRate javadocの説明には、「タスクの実行によって例外が発生した場合、後続の実行は抑制されます。だから、これは一度だけ呼び出される理由です...

また、このハンドラのクラスに@Sharableとして注釈を付ける場合にのみ、異なるパイプラインインスタンスに対してハンドラインスタンスを再利用できるようです。それ以外の場合は、例外がスローされます。あなたのハンドラがステートレス(あなたのケースではなく、あなたのctxメンバを持っている)ならば、それを@Sharableとしてアノテートし、それをすべての作成されたパイプラインに再利用する必要があります。ステートフルな場合は、新しいパイプラインごとに新しいインスタンスを作成します(新しいクライアント接続)。

最後に、接続された各クライアントのタスクをスケジュールするには、channelActive()実装の接続されたクライアントのチャネルのctx(デフォルトでは、ケースの場合のようにチャネルのEventLoop)で参照できるexecutorを使用できます。このエグゼキュータはScheduledExecutorServiceを実装していますので、scheduleAtFixedRateもあります。 私のバージョンのコードを見て、それがあなたに合っているかどうかを見てください。

サーバー:

package com.example.test.app; 

import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.TimeUnit; 

public class Server { 

    public static void main(String[] args) throws Exception 
    { 
     EventLoopGroup bossGroup = new NioEventLoopGroup(); 
     EventLoopGroup workerGroup = new NioEventLoopGroup(); 

     try { 

      ServerBootstrap b = new ServerBootstrap(); 
      b.group(bossGroup, workerGroup); 
      b.channel(NioServerSocketChannel.class); 
      b.childHandler(new ChannelInitializer<SocketChannel>() { 
       @Override 
       protected void initChannel(SocketChannel ch) throws Exception 
       { 
        ch.pipeline().addLast(new ServerHandler()); 
       } 

      }); 
      b.option(ChannelOption.SO_BACKLOG, 128); 
      b.childOption(ChannelOption.SO_KEEPALIVE, true); 

//   ScheduledExecutorService e = Executors.newSingleThreadScheduledExecutor(); 
//   e.scheduleAtFixedRate(() -> { 
//    System.out.println("Calling..."); 
//    handler.saySomething(); 
//   }, 1, 1, TimeUnit.SECONDS); 

      ChannelFuture f = b.bind(1337).sync(); 
      f.channel().closeFuture().sync(); 

     } finally { 
      workerGroup.shutdownGracefully(); 
      bossGroup.shutdownGracefully(); 
     } 
    } 

} 

ServerHandler:

package com.example.test.app; 

import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.util.concurrent.ScheduledFuture; 

import java.util.concurrent.TimeUnit; 

public class ServerHandler extends ChannelInboundHandlerAdapter { 

    private ScheduledFuture sf; 

    @Override 
    public void channelActive(ChannelHandlerContext ctx) 
    { 
     System.out.println("Someone's connedted! "+ctx.channel()); 
     sf = ctx.executor().scheduleAtFixedRate(() -> { 
      System.out.println("Calling..."); 
      saySomething(ctx); 
     }, 1, 1, TimeUnit.SECONDS); 
    } 

    @Override 
    public void channelInactive(ChannelHandlerContext ctx) { 
     System.out.println("Someone's disconnected! "+ctx.channel()); 
     sf.cancel(false); 
    } 

    private void saySomething(ChannelHandlerContext ctx) 
    { 
      final ChannelFuture f = ctx.writeAndFlush("Sup!"); 
      f.addListener((ChannelFutureListener) (ChannelFuture future) -> { 
       System.out.println("Something has been said!"); 
      }); 
    } 

} 
+0

おかげで、ChannelHandlerContextは独自のエグゼキュータを持っていることを知りませんでした。 – Furunomoe

関連する問題