WebサーバーへのPOSTにAkka HTTPを使用しようとしています。 POSTが失敗した場合は、停止するようにして、それ以上のPOSTを送信しないようにします。Akka例外がスローされたときにHTTPのフローが停止しない
以下のコードはPOSTを作成し、テスト用Webサーバーに送信します。最初の応答で例外がスローされます。コードは、あなたがそれを印刷し表示されます。その場合には実行可能でなければなりません:次の要求が一緒に(i = 1
)置かれた後
i = 0
got response
i = 1
stopping
Exception in thread "main" java.lang.Exception
i = 2
i = 3
i = 4
i = 5
だから「停止」だと、コードだけで続けています。
一度エラーが発生してもそれ以上のPOSTを送信しないと誰もがフローを停止する方法を知っていますか?
(スカラ座2.11.8、アッカ2.4.4)
object FlowTest {
def main(args: Array[String]) {
val stop: Supervision.Decider = {
case _ =>
println("stopping")
Supervision.Stop
}
implicit val system = ActorSystem()
import system.dispatcher
implicit val mat = ActorMaterializer()
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection(host = "posttestserver.com", port = 80)
val future: Future[Done] = Source(0 to 10).map {
i =>
val uri = s"/post.php?dir=so_akka&i=$i"
println(s"i = $i")
HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i")
}.via(connectionFlow).mapAsync(1) {
resp =>
Unmarshal(resp.entity).to[String]
.map { str =>
println(str)
throw new Exception("") // Always fail
str
}
}.withAttributes(ActorAttributes.supervisionStrategy(stop)).runForeach(println)
Await.result(future, Duration.Inf)
}
}
ストリームは非同期で処理されるため、条件(例外など)に基づいてストリームをキャンセルすることはできません。後続の要素の完了した先物の結果がすでに下流に放出されている可能性があります。例外がスローされた後に本当にキャンセルする必要がある場合は、おそらく先物をブロックすることによって要素が順番に処理されるようにする必要があります。 – devkat