2016-09-28 6 views
5

のは、私はいくつかのイテレータを持っているとしましょう:未来[T]を放つソースを扱うには?

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...) 


そして、私はそのイテレータからソースを構築したい:

val source: Source[Future[Int], NotUsed] = 
    Source.fromIterator(() => nextElemIter) 


は、だから今、私のソースは Future Sを発します。

val source: Source[Int, NotUsed] = 
    Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */) 


をそして今、私はTの代わりFuture[T]を発する通常のソースを持っている:私はこのような何かを先物は、アッカのドキュメントでステージ間で渡されるか、どこか他の、その代わりに、私は常に行うことができます見たことがありません。しかし、これはハッキリと間違っているように感じます。

このような状況に対処する適切な方法は何ですか?

+3

私は 'mapAsync'と思いますここで完璧です。結局のところ、これは正確にこの目的のために意図されています - 先物をストリームに平らにする。 –

+1

'mapAsync(1)(identity)'は適切な方法です。 – expert

+0

@expertが編集されました。 –

答えて

4

あなたの質問に直接お答えください:あなたが説明した目的のためにmapAsyncを使用することに関する何か「ハッキー」がないと私はVladimirのコメントに同意します。私はFutureの基礎となるIntの値を解く直接的な方法については考えていません。間接的にあなたの質問に答える

...

は先物

ストリームに固執してみ背圧が必要な場合に、同時実行メカニズムとして、非常に便利です。しかし、純粋なFuture操作は、アプリケーション内でも同様に機能します。

Iterator[Future[Int]]が既知の限定数のFuture値を生成する場合は、Futuresを使用して並行処理を行うことができます。

&をフィルタリングしてマップしたいと想像して、Intの値を減らしてください。

def isGoodInt(i : Int) : Boolean = ???   //filter 
def transformInt(i : Int) : Int = ???   //map 
def combineInts(i : Int, j : Int) : Int = ??? //reduce 

先物は、これらの機能を使用しての直接的な方法を提供します。あなたが示唆したようにストリームを使用しての、やや間接的な方法と比較し

val finalVal : Future[Int] = 
    Future sequence { 
    for { 
     f <- nextElemIter.toSeq //launch all of the Futures 
     i <- f if isGoodInt(i) 
    } yield transformInt(i) 
    } map (_ reduce combineInts) 

val finalVal : Future[Int] = 
    Source.fromIterator(() => nextElemIter) 
     .via(Flow[Future[Int]].mapAsync(1)(identity)) 
     .via(Flow[Int].filter(isGoodInt)) 
     .via(Flow[Int].map(transformInt)) 
     .to(Sink.reduce(combineInts)) 
     .run() 
関連する問題