私は大量のHTTP投稿要求をあるサーバーに非同期に送信しようとしています。私の目標は、それぞれの応答を本来の要求と比較することです。Nettyで複数のHTTPリクエストを非同期で送信するにはどうすればよいですか?
これを行うには、Netty Snoop exampleに従っています。
ただし、この例(およびその他のhttpの例)では、複数の要求を非同期的に送信する方法や、対応する要求に続いてそれらをリンクする方法については説明していません。
すべて似質問(例えばthis one、this one、またはthis oneとして、ネッティー3からのものであり、もう4.0に存在しませんSimpleChannelUpstreamHandlerクラスを実装(documentation netty 4.0)
は誰でもでこれを解決する方法のアイデアを持っていますネッティー4.0
編集:?
私の問題は、私は、チャネルへのメッセージの多くを書くが、私は唯一のWHE、非常にゆっくりと回答(1つの応答/秒を受けています数千/秒を受け取ることを期待しています)。これを明確にするために、私がこれまでに得たものを投稿させてください。要求を送信するサーバーも多くのトラフィックを処理できると確信しています。私がこれまでに得たもの
:
import java.net.URI
import java.nio.charset.StandardCharsets
import java.io.File
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer}
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http._
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.{ReferenceCountUtil, CharsetUtil}
import io.netty.channel.nio.NioEventLoopGroup
import scala.io.Source
object ClientTest {
val URL = System.getProperty("url", MY_URL)
val configuration = new Configuration
def main(args: Array[String]) {
println("Starting client")
start()
}
def start(): Unit = {
val group = new NioEventLoopGroup()
try {
val uri: URI = new URI(URL)
val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"}
val port: Int = {val p = uri.getPort; if (p != -1) p else 80}
val b = new Bootstrap()
b.group(group)
.channel(classOf[NioSocketChannel])
.handler(new HttpClientInitializer())
val ch = b.connect(host, port).sync().channel()
val logFolder: File = new File(configuration.LOG_FOLDER)
val fileToProcess: Array[File] = logFolder.listFiles()
for (file <- fileToProcess){
val name: String = file.getName()
val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name)
val lineIterator: Iterator[String] = source.getLines()
while (lineIterator.hasNext) {
val line = lineIterator.next()
val jsonString = parseLine(line)
val request = createRequest(jsonString, uri, host)
ch.writeAndFlush(request)
}
println("closing")
ch.closeFuture().sync()
}
} finally {
group.shutdownGracefully()
}
}
private def parseLine(line: String) = {
//do some parsing to get the json string I want
}
def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = {
val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8)
val request: FullHttpRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath())
request.headers().set(HttpHeaders.Names.HOST, host)
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json")
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes())
request.content().clear().writeBytes(bytebuf)
request
}
}
class HttpClientInitializer() extends ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel) = {
val pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
//aggregates all http messages into one if content is chunked
pipeline.addLast(new HttpObjectAggregator(1048576))
pipeline.addLast(new IdleStateHandler(0, 0, 600))
pipeline.addLast(new HttpClientHandler())
}
}
class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] {
override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
try {
msg match {
case res: FullHttpResponse =>
println("response is: " + res.content().toString(CharsetUtil.US_ASCII))
ReferenceCountUtil.retain(msg)
}
} finally {
ReferenceCountUtil.release(msg)
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = {
println("HttpHandler caught exception", e)
ctx.close()
}
}
はチャンネル非同期に書き込みませんか?あなたが書いた結果、Futureを手に入れようとしています。これはどのように対処するのですか? – user1582639
私もNetty 4.0を学びます。ここに私のデザインの理解があります。私が心に留めておく最初のことは、登録されたすべてのハンドラがシングルスレッドで実行されるという自信を与えているので、共有ハンドラを使用しない限り、同期の必要はありません。したがって、提出されたすべてのリクエストはチャンネルを通じて順次送信され、応答は同じ順序で受信されます。したがって、すべての要求に対してデュプレックスハンドラのキューのようなデータ構造を管理すると、常に最新の受信応答を要求することができます。 – user1582639
返信いただきありがとうございます!私の問題は、チャンネルにたくさんのメッセージを書いているにもかかわらず、応答が非常に遅い(1応答/秒、数千/秒を受信するという希望)。これを明確にするために、私がこれまでに得たものを投稿させてください。 – Mart