2016-04-04 11 views
13

SourceQueueを使用して、エレメントをAkkaストリームソースに動的にプッシュしたいとします。 再生コントローラには、chunckedメソッドを使用して結果をストリーミングできるソースが必要です。
Playは独自のAkkaストリームシンクをボンネットで使用しているため、シンクを使用してソースキューを自分で実現することはできません。chunkedメソッドで使用する前にソースが消費されるためです(次のハッキングを使用する場合を除く)。PlayFrameworkでAkka Streams SourceQueueを使用する方法

私があれば反応性ストリームの出版社を使用して、ソース・キューを事前にマテリアライズそれを動作させることができるんだけど、それは汚いハック」のようなものです:

def sourceQueueAction = Action{ 

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run() 

    //stupid example to push elements dynamically 
    val tick = Source.tick(0 second, 1 second, "tick") 
    tick.runForeach(t => queue.offer(t)) 

    Ok.chunked(Source.fromPublisher(pub)) 
    } 

はへの簡単な方法はありますPlayFrameworkでAkka Streams SourceQueueを使用しますか?

おかげ

答えて

19

ソリューションは、そのキューのマテリアの将来取得するには、ソース上のmapMaterializedValueを使用することです:

def sourceQueueAction = Action { 
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail)) 

    futureQueue.map { queue => 
     Source.tick(0.second, 1.second, "tick") 
     .runForeach (t => queue.offer(t)) 
    } 
    Ok.chunked(queueSource) 

    } 

    //T is the source type, here String 
    //M is the materialization type, here a SourceQueue[String] 
    def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = { 
    val p = Promise[M] 
    val s = src.mapMaterializedValue { m => 
     p.trySuccess(m) 
     m 
    } 
    (s, p.future) 
    } 
+0

をなぜ私は、たとえば '' queueSource.map {_.toUpperCase}を行う場合にはIソース[String、NotUsed]を取得しないでください。代わりに、これはタイプ 'queueSource.Repr [String]の式が期待されるタイプのソース[String、NotUsed]に適合しないというエラーを返します。 ソースの要素はどこで変換されますか? [あなたの例](http://loicdescotte.github.io/posts/play-akka-streams-queue/)のダニのように、 – gabrielgiussi

関連する問題