2016-05-09 9 views
0

WebサーバーへのPOSTにAkka HTTPを使用しようとしています。 POSTが失敗した場合は、停止するようにして、それ以上のPOSTを送信しないようにします。Akka例外がスローされたときにHTTPのフローが停止しない

以下のコードはPOSTを作成し、テスト用Webサーバーに送信します。最初の応答で例外がスローされます。コードは、あなたがそれを印刷し表示されます。その場合には実行可能でなければなりません:次の要求が一緒に(i = 1)置かれた後

i = 0 
got response 
i = 1 
stopping 
Exception in thread "main" java.lang.Exception 
i = 2 
i = 3 
i = 4 
i = 5 

だから「停止」だと、コードだけで続けています。

一度エラーが発生してもそれ以上のPOSTを送信しないと誰もがフローを停止する方法を知っていますか?

(スカラ座2.11.8、アッカ2.4.4)

object FlowTest { 
    def main(args: Array[String]) { 
    val stop: Supervision.Decider = { 
     case _ => 
     println("stopping") 
     Supervision.Stop 
    } 

    implicit val system = ActorSystem() 
    import system.dispatcher 
    implicit val mat = ActorMaterializer() 
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = 
     Http().outgoingConnection(host = "posttestserver.com", port = 80) 

    val future: Future[Done] = Source(0 to 10).map { 
     i => 
     val uri = s"/post.php?dir=so_akka&i=$i" 
     println(s"i = $i") 
     HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i") 
    }.via(connectionFlow).mapAsync(1) { 
     resp => 
     Unmarshal(resp.entity).to[String] 
      .map { str => 
      println(str) 
      throw new Exception("") // Always fail 
      str 
      } 
    }.withAttributes(ActorAttributes.supervisionStrategy(stop)).runForeach(println) 

    Await.result(future, Duration.Inf) 
    } 
} 
+0

ストリームは非同期で処理されるため、条件(例外など)に基づいてストリームをキャンセルすることはできません。後続の要素の完了した先物の結果がすでに下流に放出されている可能性があります。例外がスローされた後に本当にキャンセルする必要がある場合は、おそらく先物をブロックすることによって要素が順番に処理されるようにする必要があります。 – devkat

答えて

0

だから、私は上記のコードを持っていた二つの問題があったと思います。

  1. HTTP POSTをパイプライン化しないでください。私は、Akka HTTPがPOSTを処理してエラーがなくなるまで待機してから、次のメッセージを送信することを期待していました。これは起こりません。

  2. 例外はフローを伝播していませんでした。だから、処理コードを投げても、Sourceがより多くのPOSTを作成して送信されるのを止めることはできませんでした。

したがって、2つの修正があります。

  1. 私は1つにActorMaterializerwithSyncProcessingLimitを設定しています。新しいメッセージが処理される前に送信元からの送信が停止されます。また、.mapAsyncの部分を変更して、必要に応じてステータスコードとエラーをチェックする.mapと、レスポンスボディを調べるを変更する必要がありました。 .mapの部分では返信の本文を見ることはできません。

  2. 私はKillSwitchを追加してフローを停止しました。例外をスローすると同じ効果がありますが、そうではありません。だから、これは恐ろしいハックですが、動作します。

これを実行するには、より良い方法が必要だと思います。 HTTP POSTでAkka HTTPフローを使用することは苦痛ではありません。

ここに新しいコードがあります。

object FlowTest { 
    def main(args: Array[String]) { 
    implicit val system = ActorSystem() 
    import system.dispatcher 
    implicit val mat = ActorMaterializer.create(
     ActorMaterializerSettings.create(system).withSyncProcessingLimit(1), system 
    ) 
    val connectionFlow = Http().outgoingConnection(host = "posttestserver.com", port = 80) 
    val source = Source(0 to 10) 
    val killSwitch = KillSwitches.shared("HttpPostKillSwitch") 

    try { 
     val future: Future[Done] = source.via(killSwitch.flow).map { 
     i => 
      val uri = s"/post.php?dir=test&i=$i" 
      println(s"i? = $i") 
      HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i") 
     } 
     .via(connectionFlow) 
     .map { 
      resp => 
      println("got response") 
//   if(resp.status != OK) { // always fail for testing 
       val e = new Exception("") 
       killSwitch.abort(e) 
       throw e 
//   } 
      resp 
     } 
     .mapAsync(1) { 
      resp => 
      Unmarshal(resp.entity).to[String] 
       .map { str => 
       println("got " + str) 
       str 
       } 
     } 
     .runForeach(println) 

     Await.result(future, Duration.Inf) 
    } catch { 
     case NonFatal(e) => 
     system.terminate() 
    } 
    } 
} 
関連する問題