2012-01-24 30 views
3

テキストプロトコルを使用して接続およびボンバードメッセージ(〜100バイト)を受け付けるサーバーを作成しました。実装では、3rtパーティクライアントでループバック400K /秒のメッセージを送信できます。私はこの仕事のためにNettyを選んだ、SUSE 11 RealTime、JRockit RTS。 しかしNettyに基づいて私自身のクライアントを開発し始めたとき、私は劇的なスループット低下に遭遇しました(400Kから1.3K msg/secまで)。クライアントのコードはかなり簡単です。より効果的なクライアントを書く方法をアドバイスしたり、例を示したりしてください。私は実際にレイテンシを気にしていましたが、スループットテストを始めましたが、ループバックで1.5Kmsg /秒が正常ではないと思います。 P.P.クライアントの目的はサーバーからのメッセージのみを受信して​​おり、ごくまれにハートビットを送信します。Java Netty負荷テストの問題

Client.java 

public class Client { 

private static ClientBootstrap bootstrap; 
private static Channel connector; 
public static boolean start() 
{ 
    ChannelFactory factory = 
     new NioClientSocketChannelFactory(
       Executors.newCachedThreadPool(), 
       Executors.newCachedThreadPool()); 
    ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); 

    bootstrap = new ClientBootstrap(factory); 

    bootstrap.setPipelineFactory(new ClientPipelineFactory()); 

    bootstrap.setOption("tcpNoDelay", true); 
    bootstrap.setOption("keepAlive", true); 
    bootstrap.setOption("receiveBufferSize", 1048576); 
    ChannelFuture future = bootstrap 
      .connect(new InetSocketAddress("localhost", 9013)); 
    if (!future.awaitUninterruptibly().isSuccess()) { 
     System.out.println("--- CLIENT - Failed to connect to server at " + 
          "localhost:9013."); 
     bootstrap.releaseExternalResources(); 
     return false; 
    } 

    connector = future.getChannel(); 

    return connector.isConnected(); 
} 
public static void main(String[] args) 
{ 
    boolean started = start(); 
    if (started) 
     System.out.println("Client connected to the server"); 
} 

} 

ClientPipelineFactory.java 

public class ClientPipelineFactory implements ChannelPipelineFactory{ 

private final ExecutionHandler executionHandler; 
public ClientPipelineFactory(ExecutionHandler executionHandle) 
{ 
    this.executionHandler = executionHandle; 
} 
@Override 
public ChannelPipeline getPipeline() throws Exception { 
    ChannelPipeline pipeline = pipeline(); 
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
       1024, Delimiters.lineDelimiter())); 
    pipeline.addLast("executor", executionHandler); 
    pipeline.addLast("handler", new MessageHandler()); 

    return pipeline; 
} 

} 

MessageHandler.java 
public class MessageHandler extends SimpleChannelHandler{ 

long max_msg = 10000; 
long cur_msg = 0; 
long startTime = System.nanoTime(); 
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 
    cur_msg++; 

    if (cur_msg == max_msg) 
    { 
     System.out.println("Throughput (msg/sec) : " + max_msg* NANOS_IN_SEC/( System.nanoTime() - startTime) ); 
     cur_msg = 0; 
     startTime = System.nanoTime(); 
    } 
} 

@Override 
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
    e.getCause().printStackTrace(); 
    e.getChannel().close(); 
} 

} 

更新。サーバー側には、受け入れられたクライアントチャネルに書き込む定期的なスレッドがあります。そして、すぐにチャンネルは書き換えられなくなります。 N2を更新します。パイプラインにOrderedMemoryAwareExecutorを追加しましたが、スループットが非常に低い(約4k msg /秒)。

修正済み。私はエグゼクティブをパイプラインスタック全体の前に置き、それがうまくいった!

+0

私はメッセージにタイムスタンプを送り、各メッセージの待ち時間を取得します。これは、遅延が何であるかをより詳細に知ることができます。同じホスト上でしか通信しておらず、待ち時間が非常に重要な場合は、代わりに共有メモリを使用することを検討してください。 –

+0

実際には、サーバーはリモートになります。私はエミュレータを実装して、クライアントが処理できる秒あたりのメッセージ量をテストしました。純粋なネットクライアントの実装は遅いことが判明しました。 –

+0

コードにはItchClientPipelineFactoryが表示されますが、貼り付けられたコードはClientPipelineFactory用です。これはちょうど命名エラーですか、それとも、ItchClientPipelineFactoryが最適なコードを持っていない(つまり、正しいメッセージハンドラを使用していない)場合、あなたはそれをまだ忘れていましたか? – user467257

答えて

3

サーバは、固定サイズ(〜100バイト)でメッセージを送信している場合は、クライアントのブートストラップにReceiveBufferSizePredictorを設定することができ、これはあなたが投稿したコードセグメントによると読み

bootstrap.setOption("receiveBufferSizePredictorFactory", 
      new AdaptiveReceiveBufferSizePredictorFactory(MIN_PACKET_SIZE, INITIAL_PACKET_SIZE, MAX_PACKET_SIZE)); 

を最適化します:クライアントのnioワーカースレッドはパイプライン内のすべてを実行しているため、メッセージハンドラのデコードと実行に忙しくなります。実行ハンドラを追加する必要があります。

あなたはチャンネルがサーバー側から書き込み不可能になっていると言いました。そのため、サーバーのブートストラップでウォーターマークのサイズを調整する必要があるかもしれません。書き込みバッファサイズ(書き込みキューサイズ)を定期的に監視し、メッセージがネットワークに書き込まれないためにチャネルが書き込み不能になっていることを確認できます。これは以下のようなutilクラスを持つことによって行うことができます。

package org.jboss.netty.channel.socket.nio; 

import org.jboss.netty.channel.Channel; 

public final class NioChannelUtil { 
    public static long getWriteTaskQueueCount(Channel channel) { 
    NioSocketChannel nioChannel = (NioSocketChannel) channel; 
    return nioChannel.writeBufferSize.get(); 
    } 
} 
+0

ジェスタン、答えてくれてありがとう。実際のビジネスロジックハン​​ドラ(スループットを測定する)の前にパイプラインにExecutionHandlerを追加しましたが、うまくいきませんでした。依然としてスループットは非常に低く、サーバーチャネルは書き込み可能ではありません。私はクラスNioSocketChannelを見つけようとしましたが、失敗しました。 –

+0

@EgorLakomkin、まだ問題は解決していませんか?NioSocketChannelが見つからないか、パッケージorg.jboss.netty.channel.socket.nio'がありませんか? Nettyのバージョンは何ですか? –

+0

Netty 4.xはwriteBufferSizeオブジェクトへのアクセスを許可していないようで、パッケージも変更されているようです。 – Scot