2016-05-09 16 views
0

私は大量のHTTP投稿要求をあるサーバーに非同期に送信しようとしています。私の目標は、それぞれの応答を本来の要求と比較することです。Nettyで複数のHTTPリクエストを非同期で送信するにはどうすればよいですか?

これを行うには、Netty Snoop exampleに従っています。

ただし、この例(およびその他のhttpの例)では、複数の要求を非同期的に送信する方法や、対応する要求に続いてそれらをリンクする方法については説明していません。

すべて似質問(例えばthis onethis one、またはthis oneとして、ネッティー3からのものであり、もう4.0に存在しませんSimpleChannelUpstreamHandlerクラスを実装(documentation netty 4.0

は誰でもでこれを解決する方法のアイデアを持っていますネッティー4.0

編集:?

私の問題は、私は、チャネルへのメッセージの多くを書くが、私は唯一のWHE、非常にゆっくりと回答(1つの応答/秒を受けています数千/秒を受け取ることを期待しています)。これを明確にするために、私がこれまでに得たものを投稿させてください。要求を送信するサーバーも多くのトラフィックを処理できると確信しています。私がこれまでに得たもの

import java.net.URI 
import java.nio.charset.StandardCharsets 
import java.io.File 

import io.netty.bootstrap.Bootstrap 
import io.netty.buffer.{Unpooled, ByteBuf} 
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer} 
import io.netty.channel.socket.SocketChannel 
import io.netty.channel.socket.nio.NioSocketChannel 
import io.netty.handler.codec.http._ 
import io.netty.handler.timeout.IdleStateHandler 
import io.netty.util.{ReferenceCountUtil, CharsetUtil} 
import io.netty.channel.nio.NioEventLoopGroup 

import scala.io.Source 

object ClientTest { 

    val URL = System.getProperty("url", MY_URL)  
    val configuration = new Configuration 

    def main(args: Array[String]) { 
    println("Starting client") 
    start() 
    } 

    def start(): Unit = { 

    val group = new NioEventLoopGroup() 

    try { 

     val uri: URI = new URI(URL) 
     val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"} 
     val port: Int = {val p = uri.getPort; if (p != -1) p else 80} 

     val b = new Bootstrap() 

     b.group(group) 
     .channel(classOf[NioSocketChannel]) 
     .handler(new HttpClientInitializer()) 

     val ch = b.connect(host, port).sync().channel() 

     val logFolder: File = new File(configuration.LOG_FOLDER) 
     val fileToProcess: Array[File] = logFolder.listFiles() 

     for (file <- fileToProcess){ 
     val name: String = file.getName() 
     val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name) 

     val lineIterator: Iterator[String] = source.getLines() 

     while (lineIterator.hasNext) { 
      val line = lineIterator.next() 
      val jsonString = parseLine(line) 
      val request = createRequest(jsonString, uri, host) 
      ch.writeAndFlush(request) 
     } 
     println("closing") 
     ch.closeFuture().sync() 
     } 
    } finally { 
     group.shutdownGracefully() 
    } 
    } 

    private def parseLine(line: String) = { 
    //do some parsing to get the json string I want 
    } 

    def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = { 
    val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8) 

    val request: FullHttpRequest = new DefaultFullHttpRequest(
     HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath()) 
    request.headers().set(HttpHeaders.Names.HOST, host) 
    request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE) 
    request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP) 
    request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json") 

    request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes()) 
    request.content().clear().writeBytes(bytebuf) 

    request 
    } 
} 

class HttpClientInitializer() extends ChannelInitializer[SocketChannel] { 

    override def initChannel(ch: SocketChannel) = { 
    val pipeline = ch.pipeline() 

    pipeline.addLast(new HttpClientCodec()) 

    //aggregates all http messages into one if content is chunked 
    pipeline.addLast(new HttpObjectAggregator(1048576)) 

    pipeline.addLast(new IdleStateHandler(0, 0, 600)) 

    pipeline.addLast(new HttpClientHandler()) 
    } 
} 

class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] { 

    override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) { 
    try { 
     msg match { 
     case res: FullHttpResponse => 
      println("response is: " + res.content().toString(CharsetUtil.US_ASCII)) 
      ReferenceCountUtil.retain(msg) 
     } 
    } finally { 
     ReferenceCountUtil.release(msg) 
    } 
    } 

    override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = { 
    println("HttpHandler caught exception", e) 
    ctx.close() 
    } 
} 
+0

はチャンネル非同期に書き込みませんか?あなたが書いた結果、Futureを手に入れようとしています。これはどのように対処するのですか? – user1582639

+0

私もNetty 4.0を学びます。ここに私のデザインの理解があります。私が心に留めておく最初のことは、登録されたすべてのハンドラがシングルスレッドで実行されるという自信を与えているので、共有ハンドラを使用しない限り、同期の必要はありません。したがって、提出されたすべてのリクエストはチャンネルを通じて順次送信され、応答は同じ順序で受信されます。したがって、すべての要求に対してデュプレックスハンドラのキューのようなデータ構造を管理すると、常に最新の受信応答を要求することができます。 – user1582639

+0

返信いただきありがとうございます!私の問題は、チャンネルにたくさんのメッセージを書いているにもかかわらず、応答が非常に遅い(1応答/秒、数千/秒を受信するという希望)。これを明確にするために、私がこれまでに得たものを投稿させてください。 – Mart

答えて

0

ChannelFuture CF = channel.writeAndFlush(createRequest());

どのように対応するリクエストに後でそれらをリンクするか。

Can netty assign multiple IO threads to the same Channel?

一度チャンネルに割り当てられたワーカースレッドは、チャネルの存続期間中は変更されません。だから私たちはスレッドから利益を得ません。これは、接続を維持しているためにチャネルが存続しているためです。

この問題を解決するには、チャネルのプール(たとえば30)を考えます。次に、チャネルプールを使用してリクエストを配置します。

 int concurrent = 30; 

    // Start the client. 
    ChannelFuture[] channels = new ChannelFuture[concurrent]; 
    for (int i = 0; i < channels.length; i++) { 
    channels[i] = b.connect(host, port).sync(); 
    } 

    for (int i = 0; i < 1000; i++) { 
     ChannelFuture requestHandle = process(channels[(i+1)%concurrent]); 
     // do something with the request handle  
    } 

    for (int i = 0; i < channels.length; i++) { 
    channels[i].channel().closeFuture().sync(); 
    } 

HTH

+0

私は、1つのチャンネルでも、サーバー側から1秒あたり1メッセージを増やすべきだと思います。私の前提は、著者が応答を処理する機会を与えない要求をチャネルに積み重ねることです。 – user1582639

+0

並行性 - 時間(ミリ秒)(エンドポイントに基づいて変化するであろう) 1~1000要求を処理する - 55219 10から48749 30から13364 100 - スレッドの数が時間の増加まで29106 性能を改善するが、後でコンテキストスイッチはパフォーマンスに影響を与えます。 –

+0

user1582639あなたが言及したことは確かに問題の1つでした。要求の量を制限することで、パフォーマンスが大幅に向上しました。 @AmodPandey:チャンネルのプールを使用することもできますし、パフォーマンスをさらに向上させます。しかし、私は依然として、空間的なリクエスタを対応するレスポンスにリンクする方法を理解していません。 writeAndFlushは実際にChannelFurtureを返しますが、リクエストが送信された後にリスナーを追加すると、リスナーを追加すると、何とか応答に接続できなくなります。 – Mart

関連する問題