私はブロードキャストとジップの内側にフローグラフを持っています。何か(それが何であるかにかかわらず)がこの流れの中で失敗するなら、問題のある要素をそれに渡して再開したいと思います。私は印刷にそれを期待するAkkaストリーム - 失敗した後にブロードキャストとジップでグラフを再開する
val flow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].map {
case 5 => throw new RuntimeException("BOOM!")
case x => x
}
val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])
bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out)
})
Source(1 to 9)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.foreach(println))
:私は、次の解決策を考え出した
(1,1)
(2,2)
(3,3)
(4,4)
(5,5)
(6,6)
(7,7)
(8,8)
(9,9)
はしかし、それはデッドロック、のみ印刷:
(1,1)
(2,2)
(3,3)
(4,4)
我々はいくつかを行ってきましたデバッグすると、子供に "再開"戦略が適用されたことが判明しました。これにより、失敗後にdangerousFlow
が再開され、bcast
の要素が要求されました。 bcast
はsafeFlow
が別の要素を要求するまで要素を送出しません。実際には(要求がzip
を待っているので)実際には起こりません。
ステージの内部で何が問題になったかにかかわらず、グラフを再開する方法はありますか?