2016-04-26 11 views
2

私は既にRx Scalaを使用しているプロジェクトの1つとしてAkka Streamsを試しています。 Akka Streamsが私たちが持っているRx Scalaライブラリを置き換えるのにどのように適しているか見てみたいと思っていました。私がAkka Streamsで可能ではないことの1つは、1つのSourceと多くのSinksを持つ可能性です。セイ、この例ではアッカストリームのドキュメントからまっすぐ取り出し:Akkaストリームを使用した1つのソースから複数のシンクへ

val source = Source(1 to 10) 
val sink = Sink.fold[Int, Int](0)(_ + _) 

// connect the Source to the Sink, obtaining a RunnableGraph 
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // how could I materialize to a Seq of Sinks? 

// materialize the flow and get the value of the FoldSink 
val sum: Future[Int] = runnable.run() 

Rxのライブラリを使用して、私は両方完全に私に1つのソースをマッピングするための柔軟性を与えるデカップリングソース(観測)とシンク(オブザーバー) (Observable)であり、n個のシンク(Observers)を有する。 Akka Streamsでこれをどのように達成できますか?すべてのポインタが役立つだろう!

答えて

2

これは具体的BroadcastGraphsで利用可能である:

ブロードキャスト[T] - (1つの入力、N出力)入力要素が に発する所与各出力

からいくつかのサンプルコードドキュメント:

val in = Source(1 to 10) 
val out = Sink.ignore 

val bcast = builder.add(Broadcast[Int](2)) 
val merge = builder.add(Merge[Int](2)) 

val f1, f2, f3, f4 = Flow[Int].map(_ + 10) 

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
      bcast ~> f4 ~> merge 
ClosedShape 
関連する問題