2016-03-22 13 views
0

私はIterableの8要素(Flux.fromIterable(..))から構築されたfluxを持っています。 各フラックスの排出量ごとに、メソッドを非同期に呼び出す必要があります。 dispatchOnpublishOnでうまくいきませんでしたが、最終的にmap(CompletableFuture.supplyAsync(..), executor)で解決してfluxflux<CompletableFuture<Boolean>>に変換しました。flux.take(x)またはflux.all(..)がすべての排出を待つことはありません

最後の項目が完了したときにのみフローを続行します。 all(..)と、take(size of the Iterable)を試しましたが、どちらの場合も、すべての要素が完了する前にフローが継続します。 これは、私のエグゼキュータがスレッドを4つしか持たず、CompletableFuture -sをフラックスに追加するまでに時間がかかるからです。

all(..)またはtake(8)なぜフラックスが完了するのを待っていますか? どうすれば待つことができますか?

コード:

Mono 
    .fromFuture(dbUtil.getEntity(id)) 
    .doOnError(t -> { 
     ... 
     return;}) 
    .doOnSuccess(s -> log.info("Got it: " + s)) 
    .flatMap(s -> 
     Flux.fromIterable(s.getItemsMap().entrySet()) 
      .map(e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR)) 
      .take(s.getItemsMap().entrySet().size()) 
    ) 
    .all(...) 
    .consume(b -> done(b)); 

答えて

1

あなたは、各項目のための計算と並行に行きたい場合は、flatMap +ちょうど+ subscribeOn +マップを使用することができます。私はRxJavaであなたに例をあげるので、私は、原子炉のスケジューラタイプとあまり慣れていないよ:

ExecutorService exec = ... 
Scheduler scheduler = Schedulers.from(exec); 

Observable.from(...) 
.flatMap(e -> Observable.just(e).subscribeOn(scheduler).map(v -> process(v))) 
.subscribe(...); 
+0

ありがとう@akarnokd。私はあなたの提案を試みたが、それで台無しになったし、時間の圧力のために、私はそれを今のところ脇に置かなければならない。とにかく、私が正しく理解すれば、結果は原子炉で試したものに似ているはずです。おそらく 'dispatchOn' /' publishOn'の代わりに 'subscribeOn'が多少の違いを生むかもしれません。 – user5396668

0

私は私はあなたが作成することができないと仮定することができますので、あなたに観察

.finallyDo("here arrive when onComplete or onError") 

に追加しようとするだろうイテレータごとに観測可能なものを作成し、それらをマージまたはジップしてサブスクライブするだけです。

Observable.merge(obsevableIter1(), observableIter2()) 
       .subscribe(result -> showResult(result.toString())); 
+0

ありがとう@Paul。私は 'finnalyDo'を見ましたが、Flux(/ observable)が完了するまで排出物を通過させたくないので、ここには合わないようです。 – user5396668

関連する問題