2013-08-04 10 views
12

Stream[X]function X => Future Yがある場合があります。これを組み合わせてFuture[Stream[Y]]にしたいと思いますそれをするために。例えば、私は未来を返す関数を使ってストリームをマッピングする

val x = (1 until 10).toStream 
def toFutureString(value : Integer) = Future(value toString) 

val result : Future[Stream[String]] = ??? 

が、私は正しい結果が得られますが、未来を、返す前にストリーム全体を消費しているようだ

val result = Future.Traverse(x, toFutureString) 

を試してみました、多かれ少なかれ敗北purpse

試しました

val result = x.flatMap(toFutureString) 

しかし、それはとコンパイルされません

val result = x.map(toFutureString) 

はやや奇妙と無用を返しStream[Future[String]]

物事が固定取得する私はここに何をすべきか?

編集:私はStreamに引っかかっていないよ、私は限りそれは頭

を処理するために開始する前に、すべての項目を評価する上でブロックされませんよう、 Iterator上の同じ操作と同じように幸せになりたいです

Edit2:Future.TraverseコンストラクトがFuture [Stream]を返す前にストリーム全体を走査する必要があるとは100%確信していませんが、私はそうすると思います。そうでなければ、それはそれ自体素晴らしい答えです。

Edit3:私はまた、結果を順序どおりにする必要はありません。私はストリームやイテレータでどんな順序でも返されます。

+0

下記の私の回答をフォローアップするために[問題](https://issues.scala-lang.org/browse/SI-7718)を提出しました。 –

+0

ああ、素晴らしい@TravisBrown。私は自分自身でそれをしたいと思っていましたが、私はJiraにログインする方法を見つけることができませんでした – Martijn

+0

少し不明です - 前のコレクションのすべての要素に "toFutureString"を適用しないようにしたいですか?未来を創造するだけのオーバーヘッドはありません。 「リスト」の残りの項目がサンクであれば、評価はどのようにトリガーされますか?リストの中の未来の完成? Scalaで見つけることができるすべてのシーケンス/トラバース操作は、個々のリスト要素に対して厳密に見えました。 – pdxleif

答えて

9

正しいトラックにはtraverseがありますが、残念ながらこの場合は標準ライブラリの定義が少し壊れているように見えます。リターンする前にストリームを消費する必要はありません。

Future.traverseは「一筆書き」のタイプ(例えば、より多くの情報のためthesepapersまたは私の答えhereを参照)に包まれた任意の応用的数子に取り組んでいますはるかに一般的な関数の特定のバージョンです。

Scalazライブラリは、このより一般的なバージョンを提供し、それはこの場合(私はscalaz-contribからFutureためのApplicativeファンクタのインスタンスを取得していますので注意して期待通りに動作します。それはまだあるScalazの安定したバージョン、ではまだありませんこのFutureを持っていないスカラ座2.9.2、)に対してクロス構築された:

import scala.concurrent._ 
import scalaz._, Scalaz._, scalaz.contrib.std._ 

import ExecutionContext.Implicits.global 

def toFutureString(value: Int) = Future(value.toString) 

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString 

これは、無限のストリーム上ですぐに戻りますので、我々はそれが最初にかかるされていないことを確実に知ります。脚注として


:あなたは Future.traverseため the sourceを見ればあなたはそれが流れた場合に便利な、必ずしも必要ではないか、適切である foldLeft、という点で実装だということがわかります。

+0

FYI、Scala 2.9.3にはscala.concurrentが含まれています –

+0

@ViktorKlang:まさにこれらのインスタンスはScalaz core(https://github.com/scalaz/scalaz/wiki/Roadmap#sip-14)しかし私が知る限り、まだ具体的なスケジュールはありません。 –

+0

いい考えのように聞こえます。 –

2

ストリーム忘れ:(私の8コアボックス上)

import scala.concurrent.Future 
import ExecutionContext.Implicits.global 

val x = 1 to 10 toList 
def toFutureString(value : Int) = Future { 
    println("starting " + value) 
    Thread.sleep(1000) 
    println("completed " + value) 
    value.toString 
} 

利回り:

scala> Future.traverse(x)(toFutureString) 
starting 1 
starting 2 
starting 3 
starting 4 
starting 5 
starting 6 
starting 7 
starting 8 
res12: scala.concurrent.Future[List[String]] = [email protected] 

scala> completed 1 
completed 2 
starting 9 
starting 10 
completed 3 
completed 4 
completed 5 
completed 6 
completed 7 
completed 8 
completed 9 
completed 10 
ので

それらの8は、それは経由して設定可能だが、すぐに(各コアに1つのキックオフを取得しますスレッドプールのエグゼキュータ)、そしてそれらの完全なものとして、より多くのものが打ち出されます。未来[List [String]]はすぐに戻り、一時停止後に "completed x"メッセージの印刷を開始します。

List [Url]があり、タイプがUrl => Future [HttpResponseBody]の関数を使用した場合の使用例があります。その関数でそのリストのFuture.traverseを呼び出して、それらのhttp要求を並行して開始し、結果のリストである単一の未来を取り戻すことができます。

あなたが行っていたようなものでしたか?

+0

"ストリーム要素を熱心に評価したくない"と思うのは、それらの要素が先物を開始するタスクへの入力であるとき、並列性とは異なると思われます。あなたはそのストリームを消費/評価したいのですか? – pdxleif

関連する問題