2017-02-10 2 views
0

BroadcastHubの例では、同じプロデューサを聴くワーカーを動的に生成することができます。しかし、この産卵はコードで明示的に行わなければなりません。私はそれがストリームのイベントの反応としてコード化できるのだろうかと思います。別のストリームに基づいてAkka-Streamsでフローを生成することは可能ですか?

以下の例では、spawnsストリームに「スポーン」メッセージを受信した後、2人のワーカーを追加したいと考えています。出来ますか?

package com.example 

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source} 

import scala.concurrent.duration._ 

object TestApp extends App { 
    implicit val system = ActorSystem("system") 
    implicit val materializer = ActorMaterializer() 

    val ticks = Source.tick(0.second, 1.second, "Tick").take(10) 

    val broadcaster = ticks.toMat(BroadcastHub.sink(bufferSize = 16))(Keep.right).run() 

    def prefixFlow(tag:String) = Flow[String].map(_ + s" from $tag").to(Sink.foreach(println)) 

    // Print out messages from the producer in two independent consumers 
    broadcaster.runWith(prefixFlow("1")) 
    broadcaster.runWith(prefixFlow("2")) 

    // Is it possible to spawn more flows based on another stream? 
    val spawns = Source.tick(2.second, 3.second, "Spawn").take(2) 
    // spawns.foreach(broadcaster.runWith(prefixFlow("XXX")) 
} 

答えて

2

具体的な例では、簡単なmapをお探しですか?

val spawns = Source 
    .tick(2.second, 3.second, "Spawn") 
    .take(2) 
    .map(_ ⇒ broadcaster.runWith(prefixFlow("XXX"))) 
    .runWith(Sink.ignore) 
+1

ええ、時には簡単な解決策が重要です:) – tonicebrian

関連する問題