2016-03-12 12 views
5

Futuresのリストの結果からObservableをリアルタイムで生成したいと思います。複数スレッドからの先物 - onNextから観測可能

私はFuture.sequenceで実行している先物のリストを持っていて、完了するたびに通知するObservableで進捗を監視しています。私は基本的に次のようにしています:

def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { 
    Observable[String](observer => { 
     val loudFutures: List[Future[Int]] = futs.map(f => { 
      f onComplete { 
       case Success(a) => observer.onNext(s"just did $a more") 
       case Failure(e) => observer.onError(e) 
      } 
      f 
     }) 
     Future.sequence(loudFutures) onComplete { 
      case Success(_) => observer.onCompleted() 
      case Failure(e) => observer.onError(e) 
     } 
    }) 
    } 

これは私のテスト環境でうまくいきます。しかし、私はちょうど重複呼び出しがないことに注意しないで、onNextが異なるスレッドから呼び出されるべきではないことを読んだだけです。これを修正するための推奨される方法は何ですか? Observablesの実際の使用方法の多くは、このように非同期コードからonNextを呼び出す必要がありますが、ドキュメントでも同様の例は見つかりません。

+0

私はより良い答えがありますかどうかわからないんだけど、あなたは例えば単一スレッドの実行コンテキスト( 'ExecutionContext.fromExecutor(エグゼキューを使用する場合は、' onNext'呼び出しが同じスレッドで実行されていることを確認することができます。 newSingleThreadExecutor()) ')を呼び出して、それらの' onComplete'コールバックを実行します。 – Kolmar

+0

'onNext'に関してあなたが参照している記事を参照できますか?このユースケースは、私の立場から見れば完璧です。 – mavarazy

+0

@mavarazy:これで見つかったドキュメントの多くはかなり不明ですが、[this](http://reactivex.io/documentation/operators/serialize.html)では、 'serialize()'を使って2つの重複を避ける方法について話しています複数のスレッドから 'onNext()'を呼び出さないように警告します。少なくとも、あなたが使用している場合は、あなたは 'onNext()'を呼び出す必要があります。件名。私が見つけることができるRxの公式の例はすべてシングルスレッドです。 – thund

答えて

1

The Observable Contract

観測は(ない 並列で)シリアルオブザーバーに通知を発行する必要があります。彼らは 異なるスレッドからこれらの通知を発行するかもしれませんが、 通知の間に正式な先起こりの関係がなければなりません。

観察可能で、おそらく別のスレッドから、非同期にその観察者のメソッド を起動することが可能であるSerialize

を見てみましょう。これにより、 ObservableがObservableコントラクトに違反する可能性があります。 通知がOnNext 通知の前にOnCompletedまたはOnError通知を送信しようとするか、2つの 異なるスレッドから同時にOnNext通知を行う可能性があります。 シリアライズ演算子を適用することにより、このようなObservableを に正しく動作させ、同期させることができます。

+0

ありがとうございます。私がすぐにSerializeページを見てみると、Rx-scalaセクションにはTBDと書かれていて実際の構文は分かりません。私はこれまで実装されたかどうかは不明でした。しかし、次の構文はコンパイルされます: 'Observable [String](observer => {...})。serialize'、そしてうまく動いているようです。正直言って、 '.serialize'が何かをするかどうかはまだ分かりません。私は厳しいストレステストを構築しようとしています。私はそれが 'Observable'のデフォルト実装ではないことにも驚いています。 – thund

関連する問題