13
SourceQueueを使用して、エレメントをAkkaストリームソースに動的にプッシュしたいとします。 再生コントローラには、chuncked
メソッドを使用して結果をストリーミングできるソースが必要です。
Playは独自のAkkaストリームシンクをボンネットで使用しているため、シンクを使用してソースキューを自分で実現することはできません。chunked
メソッドで使用する前にソースが消費されるためです(次のハッキングを使用する場合を除く)。PlayFrameworkでAkka Streams SourceQueueを使用する方法
私があれば反応性ストリームの出版社を使用して、ソース・キューを事前にマテリアライズそれを動作させることができるんだけど、それは汚いハック」のようなものです:
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
はへの簡単な方法はありますPlayFrameworkでAkka Streams SourceQueueを使用しますか?
おかげ
をなぜ私は、たとえば '' queueSource.map {_.toUpperCase}を行う場合にはIソース[String、NotUsed]を取得しないでください。代わりに、これはタイプ 'queueSource.Repr [String]の式が期待されるタイプのソース[String、NotUsed]に適合しないというエラーを返します。 ソースの要素はどこで変換されますか? [あなたの例](http://loicdescotte.github.io/posts/play-akka-streams-queue/)のダニのように、 – gabrielgiussi