2016-05-26 3 views
2

で結果を処理するために、私はフラックスを次ていますプロジェクト炉 - どのようにウィンドウ

Flux.range(0, 100) 
    .log("before window") 
    .window(10) 
    .map(Flux::toList) 
    .log("after window") 
    .map((w) -> { 
     System.out.println(w.subscribe().get())); 
     return 1; 
    }) 
    .reduce(0, (a, b) -> a + b) 
    .doOnSuccess(System.out::println) 
    .subscribe(); 

私はこの瞬間のために項目のブロックを得ることができる必要がありdoOnNextに思いました。しかし.getは常にTimeout on Mono blocking read excpeptionで失敗します。

は、私はすべてが.window前に処理していた、と結果のログを与え、ログから参照してください。

[   main] before window : onNext(0) 
[   main] after window : onNext({ operator : "BufferAll" }) 

は公式ドキュメントで非常に多くの情報があるじゃないので、私は私が何かを誤解推測し、誤っwindow機能を使用して。しかし、私はここで間違って何をしているのですか?

UPDATE

私が代わりに.doOnSuccessを使用している場合、この特定の問題を回避することができる考え出しました。同様:

.map((w) -> { 
    w.doOnSuccess((w2) -> System.out.println(w2))).subscribe(); 
    return 1; 
} 

しかし本当の問題、それは私の場合で私は、提供されたデータに対していくつかの計算(代わりに1の)に基づいてから数を返す必要があります。私はここで新しいMonoを作成することができますが、とにかく、後で私は.getする必要があります。たとえば、最終的には.reduceです。だから私が.reduce(0, (a, b) -> a + b.get())を実行すると、そこでは失敗します。

モノからどのようにして安全に値を取得できますか? ToListメソッドをし、自分でそれを行う、窓の後にマッピング相からMonoを返す:

UPDATE 2

今私はフラックスを削除しました。それはおそらくそれがどうあるべきかです。 .reduceで立ち往生

.window(10) 
.log("after window") 
.map((w) -> { 
    //basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it 
    return w.reduce(...).map(ids -> 100).subscribe(); 
}) 
.reduce(0, (a, b) -> a + b.get()) 

しかし、それはとにかく動作しません、:

は私が.reduceステップを削除した場合、それが動作することに気づきました。この場合、.windowによって提供されるFluxの処理は、メインフローの後にに実行されます。私はそれを支配しておらず、最終結果を得ることさえできません。それは意味をなさない。私はさらに、使用前ウィンドウを削減する必要があるため

答えて

0

問題でした。

同様:

window(...).flatMap((window) -> window.reduce(...)) 

私は私のマッパーでモノ、それブロック内の実行が流れていることをやっていたので、それは右の場所ではありません。それは次の使用の前に、窓の後でなければなりません。

正しいバージョンは次のとおりです。

Flux.range(0, 100) 
    .window(10) 
    .flatMap(window -> { 
     return window.reduce(new ArrayList<>(), (a, b) -> { 
      a.add(b); 
      return a; 
     }); 
    }) 
    .map((list) -> list.size()) 
    .reduce(0, (a, b) -> a + b) 
    .doOnSuccess(System.out::println) 
    .subscribe(); 

私はListにウィンドウを変換しています、そして私は作業を以下に、この値を使用することができます。

0

私はこれを最近見てきました。 .window(3)関数は、それをフラックスのフラックスに変換します。これは、ネストされた購読をすることによってきちんと処理されると思います。ウィンドウがonComplete()の場合、.buffer()関数はフラックスをリストに変換します。

コードのこのビットは、私にとっては画期的だった...

Flux<Integer> flux = Flux.range(1, 10).log(); 

flux 
    .doOnNext(s -> logger.info("pre-window:{}", s)) 
    .window(3) 
    .subscribe(s -> s.log().buffer().subscribe(t -> logger.info("post-window:{}", t))); 

はまた、単に.buffer(3)を使用すると、類似していない場合、同一の結果を生成するように見える...

flux 
    .doOnNext(s -> logger.info("pre-buffer:{}", s)) 
    .buffer(3) 
    .subscribe(t -> logger.info("post-buffer:{}", t)); 

希望に役立ちます!

関連する問題