で結果を処理するために、私はフラックスを次ていますプロジェクト炉 - どのようにウィンドウ
Flux.range(0, 100)
.log("before window")
.window(10)
.map(Flux::toList)
.log("after window")
.map((w) -> {
System.out.println(w.subscribe().get()));
return 1;
})
.reduce(0, (a, b) -> a + b)
.doOnSuccess(System.out::println)
.subscribe();
私はこの瞬間のために項目のブロックを得ることができる必要がありdoOnNext
に思いました。しかし.get
は常にTimeout on Mono blocking read
excpeptionで失敗します。
は、私はすべてが.window
前に処理していた、と結果のログを与え、ログから参照してください。
[ main] before window : onNext(0)
[ main] after window : onNext({ operator : "BufferAll" })
は公式ドキュメントで非常に多くの情報があるじゃないので、私は私が何かを誤解推測し、誤っwindow
機能を使用して。しかし、私はここで間違って何をしているのですか?
UPDATE
私が代わりに.doOnSuccess
を使用している場合、この特定の問題を回避することができる考え出しました。同様:
.map((w) -> {
w.doOnSuccess((w2) -> System.out.println(w2))).subscribe();
return 1;
}
しかし本当の問題、それは私の場合で私は、提供されたデータに対していくつかの計算(代わりに1
の)に基づいてから数を返す必要があります。私はここで新しいMono
を作成することができますが、とにかく、後で私は.get
する必要があります。たとえば、最終的には.reduce
です。だから私が.reduce(0, (a, b) -> a + b.get())
を実行すると、そこでは失敗します。
モノからどのようにして安全に値を取得できますか? ToListメソッドをし、自分でそれを行う、窓の後にマッピング相からMono
を返す:
UPDATE 2
今私はフラックスを削除しました。それはおそらくそれがどうあるべきかです。 .reduce
で立ち往生
.window(10)
.log("after window")
.map((w) -> {
//basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it
return w.reduce(...).map(ids -> 100).subscribe();
})
.reduce(0, (a, b) -> a + b.get())
しかし、それはとにかく動作しません、:
は私が.reduce
ステップを削除した場合、それが動作することに気づきました。この場合、.window
によって提供されるFlux
の処理は、メインフローの後にに実行されます。私はそれを支配しておらず、最終結果を得ることさえできません。それは意味をなさない。私はさらに、使用前ウィンドウを削減する必要があるため