2012-01-28 19 views
0

私は非同期メッセージを送信するNettyサーバーを作成しました。サーバーは期待どおりに動作しています。Nettyクライアントの非同期呼び出しが処理されない

私はいくつかのtelnetセッションでサーバにtelnet接続することができ、非同期メッセージが書き出されます。

私はネッティークライアントを書かれているが、クライアントはなく非同期を駆動イベントのようです。クライアントが接続するときのサーバー上。サーバーはクライアント "Welcome"に書き戻し、メッセージはmessageReceivedイベントによってクライアントで処理されますが、非同期イベントはSimpleChannelHandler内のイベントを起動しません。

質問:Nettyクライアントに非同期メッセージ/イベントを受け取らせるにはどうすればよいですか?現時点では、イベント駆動型です。

だけ追加して、クライアントがネッティーTelnetクライアントです。[http://netty.io/docs/stable/xref/org/jboss/netty/example/telnet/package-summary.html]

サーバコード

//---------Server code--------------- 
import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 
import org.jboss.netty.bootstrap.ServerBootstrap; 
import org.jboss.netty.channel.Channel; 
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 

public class TestServer { 
    private final ServerBootstrap clientServerBootstrap; 
    private EchoServerFactory echoServerFactory; 
    private Channel appChannel; 

    public TestServer() { 
     this.clientServerBootstrap = new ServerBootstrap(
      new NioServerSocketChannelFactory(
       Executors.newCachedThreadPool(), 
       Executors.newCachedThreadPool())); 
     this.clientServerBootstrap.setOption("child.tcpNoDelay", true); 
    } 

    public static void main(String[] args) { 
     try { 
      TestServer test = new TestServer(); 
      test.start(); 

      for(int i = 0; i < 100; i++) { 
       long time = System.currentTimeMillis()+1000; 
       String data = "setPhase();d(1,1,2.2342,"+time+");"; 
       System.out.println(data); 
       test.write(data); 
       Thread.sleep(1000); 
      } 
     } catch(Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 

    public void start() { 
     echoServerFactory = new EchoServerFactory(); 
     clientServerBootstrap.setPipelineFactory(echoServerFactory); 
     InetSocketAddress isaApp = new InetSocketAddress("127.0.0.1", 9090); 
     appChannel = clientServerBootstrap.bind(isaApp); 
     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
      public void run() { 
       stop(); 
      } 
     })); 
    } 

    public void write(String message) throws Exception { 
     echoServerFactory.write(message); 
    } 

    public void stop() { 
     clientServerBootstrap.releaseExternalResources(); 
    } 
} 

//---------------Factory---------------------------- 
import org.jboss.netty.channel.ChannelPipeline; 
import org.jboss.netty.channel.ChannelPipelineFactory; 
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; 
import org.jboss.netty.handler.codec.frame.Delimiters; 
import org.jboss.netty.handler.codec.string.StringDecoder; 
import org.jboss.netty.handler.codec.string.StringEncoder; 

public class EchoServerFactory implements ChannelPipelineFactory { 
    EchoServerHandler handler = new EchoServerHandler(); 

    public EchoServerHandler getHandler() { 
     return handler; 
    } 

    public void write(String message) throws Exception { 
     handler.write(message); 
    } 

    public ChannelPipeline getPipeline() throws Exception { 
     // Create a default pipeline implementation. 
     ChannelPipeline pipeline = org.jboss.netty.channel.Channels.pipeline(); 

     // Add the text line codec combination first, 
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
    pipeline.addLast("decoder", new StringDecoder()); 
    pipeline.addLast("encoder", new StringEncoder()); 
    // and then business logic. 
    pipeline.addLast("handler", handler); 
    return pipeline; 
    } 
} 

//---------------Handler---------------------------- 

import java.net.InetAddress; 
import java.net.InetSocketAddress; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.atomic.AtomicLong; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

import org.jboss.netty.buffer.ChannelBuffer; 
import org.jboss.netty.channel.Channel; 
import org.jboss.netty.channel.ChannelEvent; 
import org.jboss.netty.channel.ChannelFuture; 
import org.jboss.netty.channel.ChannelFutureListener; 
import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.channel.ChannelStateEvent; 
import org.jboss.netty.channel.ChildChannelStateEvent; 
import org.jboss.netty.channel.ExceptionEvent; 
import org.jboss.netty.channel.MessageEvent; 
import org.jboss.netty.channel.SimpleChannelHandler; 
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 
import org.jboss.netty.channel.WriteCompletionEvent; 
import org.jboss.netty.channel.group.ChannelGroup; 
import org.jboss.netty.channel.group.DefaultChannelGroup; 

public class EchoServerHandler extends SimpleChannelHandler { 
    private static final Logger logger =  Logger.getLogger(EchoServerHandler.class.getName()); 

    static final ChannelGroup channels = new DefaultChannelGroup(); 

    public void write(String message) throws Exception { 
     channels.write(message); 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     Channel channel = e.getChannel(); 
     channels.add(channel); 
     channel.write("Welcome\n\n"); 
    } 

    @Override 
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     // Unregister the channel from the global channel list 
     // so the channel does not receive messages anymore. 
     //channels.remove(e.getChannel()); 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 
     // Send back the received message to the remote peer. 
     System.out.println("------------------------->"+e.getMessage()); 
     Channel ch = e.getChannel(); 
     ChannelFuture f = ch.write(e.getMessage()); 

     /* f.addListener(new ChannelFutureListener() { 
      public void operationComplete(ChannelFuture future) { 
       Channel ch = future.getChannel(); 
       System.out.println("Completed : "+ch.isOpen()); 
      } 
     });*/ 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
     // Close the connection when an exception is raised. 
     logger.log(
       Level.WARNING, 
       "Unexpected exception from downstream.", 
       e.getCause()); 
     e.getChannel().close(); 
    } 
} 

クライアントコード

//---------------- Client Code ------------------- 
import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.InetSocketAddress; 
import java.util.concurrent.Executors; 

import org.jboss.netty.bootstrap.ClientBootstrap; 
import org.jboss.netty.channel.Channel; 
import org.jboss.netty.channel.ChannelFuture; 
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 
import org.jfree.ui.RefineryUtilities; 

/** 
* Simplistic telnet client. 
*/ 
public class TelnetClient { 

    private final String host; 
    private final int port; 

    public TelnetClient(String host, int port) { 
     this.host = host; 
     this.port = port; 
    } 

    public void run() throws IOException { 
     // Configure the client. 
     ClientBootstrap bootstrap = new ClientBootstrap(
       new NioClientSocketChannelFactory(
         Executors.newCachedThreadPool(), 
         Executors.newCachedThreadPool())); 

     // Configure the pipeline factory. 
     bootstrap.setPipelineFactory(new TelnetClientPipelineFactory()); 
     bootstrap.setOption("tcpNoDelay", true); 
     // Start the connection attempt. 
     ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); 

     // Wait until the connection attempt succeeds or fails. 
     Channel channel = future.awaitUninterruptibly().getChannel(); 
     if (!future.isSuccess()) { 
      future.getCause().printStackTrace(); 
      bootstrap.releaseExternalResources(); 
      return; 
     } 

     // Read commands from the stdin. 
     ChannelFuture lastWriteFuture = null; 
     BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); 
     for (;;) { 
      String line = in.readLine(); 
      if (line == null) { 
       break; 
      } 

      // Sends the received line to the server. 
      lastWriteFuture = channel.write(line + "\r\n"); 

      // If user typed the 'bye' command, wait until the server closes 
      // the connection. 
      if (line.toLowerCase().equals("bye")) { 
       channel.getCloseFuture().awaitUninterruptibly(); 
       break; 
      } 
     } 

     // Wait until all messages are flushed before closing the channel. 
     if (lastWriteFuture != null) { 
      lastWriteFuture.awaitUninterruptibly(); 
     } 

     // Close the connection. Make sure the close operation ends because 
     // all I/O operations are asynchronous in Netty. 
     channel.close().awaitUninterruptibly(); 

     // Shut down all thread pools to exit. 
     bootstrap.releaseExternalResources(); 
    } 

    public static void main(String[] args) throws Exception { 
     try { 
     // Parse options. 
     String host = "127.0.0.1"; 
     int port = 9090; 

     new TelnetClient(host, port).run(); 
     } catch(Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

//---------------- Client Factory ------------------- 
import static org.jboss.netty.channel.Channels.*; 

import org.jboss.netty.channel.ChannelPipeline; 
import org.jboss.netty.channel.ChannelPipelineFactory; 
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; 
import org.jboss.netty.handler.codec.frame.Delimiters; 
import org.jboss.netty.handler.codec.string.StringDecoder; 
import org.jboss.netty.handler.codec.string.StringEncoder; 

/** 
* Creates a newly configured {@link ChannelPipeline} for a new channel. 
*/ 
public class TelnetClientPipelineFactory implements ChannelPipelineFactory { 



    public ChannelPipeline getPipeline() throws Exception { 
     // Create a default pipeline implementation. 
     ChannelPipeline pipeline = pipeline(); 

     // Add the text line codec combination first, 
     pipeline.addLast("framer", new DelimiterBasedFrameDecoder(1118192, Delimiters.lineDelimiter())); 
     pipeline.addLast("decoder", new StringDecoder()); 
     pipeline.addLast("encoder", new StringEncoder()); 

     // and then business logic. 
     pipeline.addLast("handler", new TelnetClientHandler2()); 

     return pipeline; 
    } 
} 

//----------------- Client handler ------------------- 
import java.util.logging.Level; 
import java.util.logging.Logger; 

import org.jboss.netty.channel.ChannelEvent; 
import org.jboss.netty.channel.ChannelHandlerContext; 
import org.jboss.netty.channel.ChannelStateEvent; 
import org.jboss.netty.channel.ChildChannelStateEvent; 
import org.jboss.netty.channel.ExceptionEvent; 
import org.jboss.netty.channel.MessageEvent; 
import org.jboss.netty.channel.SimpleChannelHandler; 
import org.jboss.netty.channel.WriteCompletionEvent; 

/** 
* Handles a client-side channel. 
*/ 
public class TelnetClientHandler extends SimpleChannelHandler { 

    private static final Logger logger = Logger.getLogger(TelnetClientHandler.class.getName()); 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 
     System.out.println("messageReceived"); 
     String message = (String) e.getMessage(); 
     parseMessage(message); 
    } 

    private void parseMessage(String message) { 
     try { 
      System.out.println("Messatge --> "+message); 
     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
     System.out.println(e.getCause()); 
     logger.log(Level.WARNING,"Unexpected exception from downstream.", e.getCause()); 
     e.getChannel().close(); 
    } 
} 
+0

Nettyでは、要求と応答が別々に処理されます(呼び出しスレッドは要求を送信した直後に処理されます:非同期です)、メッセージを受信したときに他のイベントを発生させたい応答を処理するためにコールバックを使用して非同期スタイルで記述したいとしますか? –

+0

@Jestan、私は自分のサーバーとクライアントコードを追加しました。私はこれによって物事が少しはっきりしてくれることを願っています私の問題は、Nettyクライアントが "Welcome"メッセージだけを取得し、他のメッセージ "setPhase(); d(1,1,2.2342,1327816096855); ..."を取得しないことです。メッセージ。私の期待は、クライアントがmessageReceivedイベントを介してすべての着信メッセージを受信することです。 – barnardh

答えて

0

あなたが区切り文字ベースのFRAを使用しているので私のデコーダNettyクライアントアプリケーションでは、各メッセージの最後に区切り文字が必要ですが、サーバーが区切り文字付きのメッセージを送信していないように見えます。

文字データ= "setPhase(); d(1,1,2.2342、" + time + ");"; System.out.println(data); test.write(data);

上記のメッセージが送信された後、フレームデコーダは多くのメッセージを受信した後も待機し続けます。これは、telnetセッションが一度に1文字を期待するため、telnetで動作します。あなたは最初のメッセージのためだけにそれを正しく行っています。

channel.write( "ようこそ\ n \ n");

+0

大変ありがとうございました! – barnardh

関連する問題