私はIterable
の8要素(Flux.fromIterable(..)
)から構築されたflux
を持っています。 各フラックスの排出量ごとに、メソッドを非同期に呼び出す必要があります。 dispatchOn
とpublishOn
でうまくいきませんでしたが、最終的にmap(CompletableFuture.supplyAsync(..), executor)
で解決してflux
をflux<CompletableFuture<Boolean>>
に変換しました。flux.take(x)またはflux.all(..)がすべての排出を待つことはありません
最後の項目が完了したときにのみフローを続行します。 all(..)
と、take(size of the Iterable)
を試しましたが、どちらの場合も、すべての要素が完了する前にフローが継続します。 これは、私のエグゼキュータがスレッドを4つしか持たず、CompletableFuture
-sをフラックスに追加するまでに時間がかかるからです。
all(..)
またはtake(8)
なぜフラックスが完了するのを待っていますか? どうすれば待つことができますか?
コード:
Mono
.fromFuture(dbUtil.getEntity(id))
.doOnError(t -> {
...
return;})
.doOnSuccess(s -> log.info("Got it: " + s))
.flatMap(s ->
Flux.fromIterable(s.getItemsMap().entrySet())
.map(e -> CompletableFuture.supplyAsync(()->process(e, s), EXECUTOR))
.take(s.getItemsMap().entrySet().size())
)
.all(...)
.consume(b -> done(b));
ありがとう@akarnokd。私はあなたの提案を試みたが、それで台無しになったし、時間の圧力のために、私はそれを今のところ脇に置かなければならない。とにかく、私が正しく理解すれば、結果は原子炉で試したものに似ているはずです。おそらく 'dispatchOn' /' publishOn'の代わりに 'subscribeOn'が多少の違いを生むかもしれません。 – user5396668