2016-09-15 5 views
3

私のシナリオはかなりシンプルですが、私はどこでも見つけることができません。RxJavaで複数の非同期呼び出しが完了するのを待つ

私は、それぞれが非同期関数を呼び出し、それらのすべてが終了するのを待っている、一連の要素を持っています(関数のロジックに実装されている非同期のやり方で)。私はRxJavaには比較的新しいので、NodeJSでコールバックを関数に渡して最後に待つことで、これを簡単に実行できました。 ここで私は必要なものの擬似コードだ(要素の反復子は、同期も注文する必要はありません):

for(line in lines){ 
callAsyncFunction(line); 
} 
WAIT FOR ALL OF THEM TO FINISH 

あなたの助けを本当に感謝です!

+0

Plsは実際のコードを掲載します。 RxJavaには、さまざまな振る舞いを持つ非常に多くの演算子があります。どのオペレーターを使用するか知らなければ、手助けができません。 –

+0

countdownlatchクラスを使用し、観察可能なコールcountDownメソッドの完了メソッドで使用します。 – gaston

答えて

1

使用のRx:この時点で

Observable 
.from(lines) 
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io()) 
.ignoreElements(); 

、あなたは別の、観察を購読する.switchIfEmpty(...)を使用することができます何をしたいかによって異なります。

0

"計算するアイテム"のイテラブルを、Observablesのdefer、次にzipを使用してObservablesのIterableに変換します。

「難しい部分は、すべて完了するのを待っています」。あなたが推測したように、リアクティブエクステンションは「物事が起こるのを待つ」ことではなく、物事に「反応する」ことです。 observableにはsubscribeを指定することができます。これは単一のアイテムを出力し、各observableに1つのアイテムしかない場合に完了します。その加入者は、待ってから通常どんな行動をとることができます。これにより、コードを戻すことができ、コードをブロックすることなくコードを処理することができます。

0

考えてみると、すべての要素からObservableを作成し、次にzip them togetherを作成してストリームの実行を継続する必要があります。

List<Observable<?>> observables = new ArrayList<>(); 
for(line in lines){ 
    observables.add(Observable.fromCallable(callAsyncFunction(line)); 
} 
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all() 

しかし、それはこのようにループのためのあなたの必要性を排除し、Observable.from()は、オブジェクトのストリームとして反復可能な範囲内のすべての要素を公開することができていない驚きとして来るかもしれません:擬似コードであなたにこのような何かを与えるだろう

。したがって、非同期操作が完了したときにonCompleted()を呼び出す新しいObservableを作成して、Observable.fromCallable()を使用することができます。その後、これらのObservableをリストにまとめてお待ちください。

Observable.from(lines) 
    .flatMap(new Func1<String, Observable<?>>() { 
     @Override 
     public Observable<?> call(String line) { 
      return Observable.fromCallable(callAsyncFunction(line)); // returns Callable 
     } 
    }).toList() 
     .map(new Func1<List<Object>, Object>() { 
     @Override 
     public Object call(List<Object> ignored) { 
      // do something; 
     } 
    }); 

私はthis answerに大きく私の答えのこの後半を基づかよ。

関連する問題