2017-10-31 3 views
4

私はブロードキャストとジップの内側にフローグラフを持っています。何か(それが何であるかにかかわらず)がこの流れの中で失敗するなら、問題のある要素をそれに渡して再開したいと思います。私は印刷にそれを期待するAkkaストリーム - 失敗した後にブロードキャストとジップでグラフを再開する

val flow = Flow.fromGraph(GraphDSL.create() { implicit builder => 
    import GraphDSL.Implicits._ 

    val dangerousFlow = Flow[Int].map { 
    case 5 => throw new RuntimeException("BOOM!") 
    case x => x 
    } 
    val safeFlow = Flow[Int] 
    val bcast = builder.add(Broadcast[Int](2)) 
    val zip = builder.add(Zip[Int, Int]) 

    bcast ~> dangerousFlow ~> zip.in0 
    bcast ~> safeFlow ~> zip.in1 

    FlowShape(bcast.in, zip.out) 
}) 

Source(1 to 9) 
    .via(flow) 
    .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) 
    .runWith(Sink.foreach(println)) 

:私は、次の解決策を考え出した

(1,1) 
(2,2) 
(3,3) 
(4,4) 
(5,5) 
(6,6) 
(7,7) 
(8,8) 
(9,9) 

はしかし、それはデッドロック、のみ印刷:

(1,1) 
(2,2) 
(3,3) 
(4,4) 

我々はいくつかを行ってきましたデバッグすると、子供に "再開"戦略が適用されたことが判明しました。これにより、失敗後にdangerousFlowが再開され、bcastの要素が要求されました。 bcastsafeFlowが別の要素を要求するまで要素を送出しません。実際には(要求がzipを待っているので)実際には起こりません。

ステージの内部で何が問題になったかにかかわらず、グラフを再開する方法はありますか?

答えて

3

問題をよく理解していると思います。あなたは、あなたの要素5がクラッシュしたときにdangerousFlowという要素が発生したときに、の要素5を停止して、zip段階に到達すると問題が発生することを確認しました。 broadcastzipの間で問題を解決する方法はわかりませんが、扱いやすい場所で問題をさらに押し上げるのはどうですか?

import scala.util._ 
val dangerousFlow = Flow[Int].map { 
    case 5 => Failure(new RuntimeException("BOOM!")) 
    case x => Success(x) 
} 

でも問題が発生した場合には、dangerousFlowはまだデータを放出します:

には、以下のdangerousFlowを使用することを検討してください。現在は現在zipのようにして、collectステージをグラフの最後のステップとして追加するだけです。あなたが書いたように、あなたは本当に、次を使用し、出力に(5, 5)タプルを、それを期待している場合今

Flow[(Try[Int], Int)].collect { 
    case (Success(s), i) => s -> i 
} 

:フローでは、これは次のようになり

Flow[(Try[Int], Int)].collect { 
    case (Success(s), i) => s -> i 
    case (_, i)   => i -> i 
} 
関連する問題