2016-09-26 11 views
2

私たちのakka httpアプリケーションにクライアント側の接続プールを使用しようとしています。ただし、接続の最大数に達すると、要求がハングアップするように見えます。私は、次のように問題を下に凝縮しました:Akka http接続プール

import java.lang.Thread.UncaughtExceptionHandler 
import java.net.ServerSocket 
import akka.actor.ActorSystem 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} 
import akka.http.scaladsl.client.RequestBuilding._ 

import scala.annotation.tailrec 
import scala.util.{Success, Try} 

object AkkaProblem extends App { 
    val server = new ServerSocket(0) 
    val serverPort = server.getLocalPort 

    object responder extends Runnable with UncaughtExceptionHandler { 
     val cr = '\r' 
     val httpResponse = 
    s"""HTTP/1.1 404 Not Found$cr 
     |Content-Type: application/json;charset=UTF-8$cr 
     |Date: Mon, 26 Sep 2016 06:30:13 GMT$cr 
     |Connection: keep-alive$cr 
     |Transfer-Encoding: chunked$cr 
     |$cr 
     |12$cr 
     |{"Hello": "World"}$cr 
     |0$cr 
     |$cr 
     |""".stripMargin 

     override final def run(): Unit = { 
      val socket = server.accept() 
      @tailrec def sendResponse(): Unit = { 
       socket.getOutputStream.write(httpResponse.getBytes) 
       sendResponse() 
      } 

      sendResponse() 
     } 

     override def uncaughtException(t: Thread, e: Throwable): Unit =() 
    } 

    for (nr <- 1 to 4) { 
     val thread = new Thread(responder, s"response-thread-$nr") 
     thread.setUncaughtExceptionHandler(responder) 
     thread.setDaemon(true) 
     thread.start() 
    } 


    implicit val system = ActorSystem("main") 
    import system.dispatcher 
    implicit val mat = ActorMaterializer() 

    val serverUri = Uri(s"http://localhost:$serverPort") 
    val request = Get(serverUri) 

    val poolFlow: Flow[(HttpRequest, Unit), (Try[HttpResponse], Unit), Http.HostConnectionPool] = 
     Http().newHostConnectionPool(serverUri.authority.host.address, serverUri.authority.port, ConnectionPoolSettings("max-connections: 4")) 

    val source = Source.repeat(request).take(1000).map((_,())) 

    val runRequest = source.viaMat(poolFlow)(Keep.right).toMat(Sink.seq)(Keep.both) 
    val (connectionPool, response) = runRequest.run() 

    response.map(_.map(_._1)).andThen { 
    case Success(responses) => 
     val byResultType = responses.groupBy(_.isSuccess).mapValues(_.size) 

     println(s"Received response. Got ${byResultType.get(true)} successes, ${byResultType.get(false)} errors") 
     connectionPool.shutdown() andThen { 
     case done => 
      println("Connection pool shut down") 
      system.terminate() 
     } 
    } 
} 

は、私はプログラムは比較的早く1000人の成功を報告し、シャットダウンするよう期待します。代わりに、無期限にハングします。この問題は、許可された接続の数に一致するように要求の数を減らした場合に解決されます。

回避策として、接続ごとに独自のプールを使用できますが、そのようにするとプールを作成する目的が壊れます。

スタックダンプにはデッドロックまたはその他の明白なmisbehavioursを示しています

 
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1000m; support was removed in 8.0 
2016-09-26 13:24:18 
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.60-b23 mixed mode): 

"Attach Listener" #25 daemon prio=9 os_prio=31 tid=0x00007f86bf001000 nid=0x3307 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"main-akka.actor.default-dispatcher-10" #24 prio=5 os_prio=31 tid=0x00007f86bbb4a000 nid=0x6b03 waiting on condition [0x000000011d717000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-9" #23 prio=5 os_prio=31 tid=0x00007f86bc402800 nid=0x6903 waiting on condition [0x000000011d614000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-8" #22 prio=5 os_prio=31 tid=0x00007f86bbb49800 nid=0x6703 waiting on condition [0x000000011d511000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-7" #21 prio=5 os_prio=31 tid=0x00007f86bb292000 nid=0x6503 waiting on condition [0x000000011d40e000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.io.pinned-dispatcher-6" #20 prio=5 os_prio=31 tid=0x00007f86bcbcd000 nid=0x6407 runnable [0x000000011d10b000] 
    java.lang.Thread.State: RUNNABLE 
     at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) 
     at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) 
     at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) 
     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) 
     - locked (a sun.nio.ch.Util$2) 
     - locked (a java.util.Collections$UnmodifiableSet) 
     - locked (a sun.nio.ch.KQueueSelectorImpl) 
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) 
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101) 
     at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.tryRun(SelectionHandler.scala:115) 
     at akka.io.SelectionHandler$ChannelRegistryImpl$Task.run(SelectionHandler.scala:219) 
     at akka.io.SelectionHandler$ChannelRegistryImpl$$anon$3.run(SelectionHandler.scala:148) 
     at akka.util.SerializedSuspendableExecutionContext.run$1(SerializedSuspendableExecutionContext.scala:67) 
     at akka.util.SerializedSuspendableExecutionContext.run(SerializedSuspendableExecutionContext.scala:71) 
     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

"DestroyJavaVM" #19 prio=5 os_prio=31 tid=0x00007f86bcba2800 nid=0xd03 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"main-akka.actor.default-dispatcher-5" #18 prio=5 os_prio=31 tid=0x00007f86bc252800 nid=0x5d03 waiting on condition [0x000000011c488000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-4" #17 prio=5 os_prio=31 tid=0x00007f86bc823800 nid=0x5b03 waiting on condition [0x000000011c185000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-3" #16 prio=5 os_prio=31 tid=0x00007f86bba26000 nid=0x5903 waiting on condition [0x000000011c082000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-akka.actor.default-dispatcher-2" #15 prio=5 os_prio=31 tid=0x00007f86bc256000 nid=0x5703 waiting on condition [0x000000011bf7f000] 
    java.lang.Thread.State: WAITING (parking) 
     at sun.misc.Unsafe.park(Native Method) 
     - parking to wait for (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool) 
     at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

"main-scheduler-1" #14 prio=5 os_prio=31 tid=0x00007f86bc248800 nid=0x5503 waiting on condition [0x000000011b8d2000] 
    java.lang.Thread.State: TIMED_WAITING (sleeping) 
     at java.lang.Thread.sleep(Native Method) 
     at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:87) 
     at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:268) 
     at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:238) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-4" #13 daemon prio=5 os_prio=31 tid=0x00007f86bc195000 nid=0x5303 runnable [0x000000011b7cf000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-3" #12 daemon prio=5 os_prio=31 tid=0x00007f86bb0fa800 nid=0x5103 runnable [0x000000011b6cc000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-2" #11 daemon prio=5 os_prio=31 tid=0x00007f86bb9ca000 nid=0x4f03 runnable [0x000000011b5c9000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"response-thread-1" #10 daemon prio=5 os_prio=31 tid=0x00007f86bb9c1000 nid=0x4d03 runnable [0x000000011b4c6000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketOutputStream.socketWrite0(Native Method) 
     at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) 
     at java.net.SocketOutputStream.write(SocketOutputStream.java:141) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.sendResponse$1(AkkaBugReport.scala:41) 
     at de.fashionid.gatekeeper.AkkaBugReport$responder$.run(AkkaBugReport.scala:45) 
     at java.lang.Thread.run(Thread.java:745) 

"Monitor Ctrl-Break" #9 daemon prio=5 os_prio=31 tid=0x00007f86bb078000 nid=0x4b03 runnable [0x000000011b101000] 
    java.lang.Thread.State: RUNNABLE 
     at java.net.PlainSocketImpl.socketAccept(Native Method) 
     at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) 
     at java.net.ServerSocket.implAccept(ServerSocket.java:545) 
     at java.net.ServerSocket.accept(ServerSocket.java:513) 
     at com.intellij.rt.execution.application.AppMain$1.run(AppMain.java:79) 
     at java.lang.Thread.run(Thread.java:745) 

"Service Thread" #8 daemon prio=9 os_prio=31 tid=0x00007f86bc810800 nid=0x4703 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007f86bc805800 nid=0x4503 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007f86bb866000 nid=0x4303 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007f86bb833800 nid=0x4103 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007f86bb844000 nid=0x3e23 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007f86bc001000 nid=0x2b03 in Object.wait() [0x0000000118c24000] 
    java.lang.Thread.State: WAITING (on object monitor) 
     at java.lang.Object.wait(Native Method) 
     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) 
     - locked (a java.lang.ref.ReferenceQueue$Lock) 
     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) 
     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) 

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007f86bb81f000 nid=0x2903 in Object.wait() [0x0000000118b21000] 
    java.lang.Thread.State: WAITING (on object monitor) 
     at java.lang.Object.wait(Native Method) 
     at java.lang.Object.wait(Object.java:502) 
     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) 
     - locked (a java.lang.ref.Reference$Lock) 

"VM Thread" os_prio=31 tid=0x00007f86bb014800 nid=0x2703 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007f86bc005000 nid=0x1f03 runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007f86bc005800 nid=0x2103 runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007f86bc006800 nid=0x2303 runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007f86bc007000 nid=0x2503 runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x00007f86bb80c000 nid=0x4903 waiting on condition 

JNI global references: 250 
+0

私にはデッドロックのような音がします。ハングアップ中に 'jstack'から面白いことがありますか? – Rich

+0

(あなたはソケットの代わりにAkka HTTPを使うことができますが、それはノンブロッキングです) – Rich

+0

提案をいただきありがとうございます。私は主にソケットベースの例を選択して問題からできるだけ多くの "魔法"を取り除いた。私たちの実際のソリューションは、サーバーにakka httpを使用します。これは基本的に沸騰するだけでした。 :) –

答えて

3

あなたが明示的に要求からHttpResponseの実体(ボディ)を消費する必要があります。応答のエンティティは実際にはストリームなので、消費しない場合は接続を開いたままにします。 documentationは、要求応答サイクルを詳述しています。サーバーでヘッダーにConnection: closeを送信するか、ストリームを使用するにはSink(たとえばSink.ignore)を添付する必要があります。

実際には、HttpResponseに対処する方法がいくつかあります。 1つは HttpResponseのメソッドを呼び出すことで、エンティティ全体を取得して接続を終了します。 timeoutは、HTTPリクエストが送信者が応答するのを待つ時間を制限します。エンティティに興味がない場合は、メソッドをHttpResponseで呼び出すこともできます。最後に、Unmarshal(resp.entity).to[SomeClass]などの有効なシンクでストリームを消費することができます。