Futures
のリストの結果からObservable
をリアルタイムで生成したいと思います。複数スレッドからの先物 - onNextから観測可能
私はFuture.sequence
で実行している先物のリストを持っていて、完了するたびに通知するObservable
で進捗を監視しています。私は基本的に次のようにしています:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
これは私のテスト環境でうまくいきます。しかし、私はちょうど重複呼び出しがないことに注意しないで、onNext
が異なるスレッドから呼び出されるべきではないことを読んだだけです。これを修正するための推奨される方法は何ですか? Observables
の実際の使用方法の多くは、このように非同期コードからonNext
を呼び出す必要がありますが、ドキュメントでも同様の例は見つかりません。
私はより良い答えがありますかどうかわからないんだけど、あなたは例えば単一スレッドの実行コンテキスト( 'ExecutionContext.fromExecutor(エグゼキューを使用する場合は、' onNext'呼び出しが同じスレッドで実行されていることを確認することができます。 newSingleThreadExecutor()) ')を呼び出して、それらの' onComplete'コールバックを実行します。 – Kolmar
'onNext'に関してあなたが参照している記事を参照できますか?このユースケースは、私の立場から見れば完璧です。 – mavarazy
@mavarazy:これで見つかったドキュメントの多くはかなり不明ですが、[this](http://reactivex.io/documentation/operators/serialize.html)では、 'serialize()'を使って2つの重複を避ける方法について話しています複数のスレッドから 'onNext()'を呼び出さないように警告します。少なくとも、あなたが使用している場合は、あなたは 'onNext()'を呼び出す必要があります。件名。私が見つけることができるRxの公式の例はすべてシングルスレッドです。 – thund