2017-07-10 3 views
0

私たちはスプリングブート2.0.0.BUILD_SNAPSHOTとスプリングブートwebflux 5.0.0で作業しています。現在、要求に応じてクライアントにフラックスを転送することはできません。Spring Web-Flux:要求に応じてFluxをWebクライアントに戻す方法は?

現在、私はイテレータからの磁束を作成しています:

public Flux<ItemIgnite> getAllFlux() { 
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator(); 

    return Flux.create(flux -> { 
     while(iterator.hasNext()) { 
      flux.next(iterator.next().getValue()); 
     } 
    }); 
} 

リクエストに応じて、私は単純にやっている:

@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json") 
public Flux<ItemIgnite> getAllFlux() { 
    return this.provider.getAllFlux(); 
} 

を私は今、ローカルに10秒後にlocalhost:8080/allを呼び出すと、私は503ステータスを取得コード。また、クライアントのように私はWebClient使っ/allを要求したとき:

public Flux<ItemIgnite> getAllPoducts(){ 
    WebClient webClient = WebClient.create("http://localhost:8080"); 

    Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class)); 
    f.subscribe(System.out::println); 
    return f; 

} 

何も起こりません。データは転送されません。

私が代わりに次のようにします。

public Flux<List<ItemIgnite>> getAllFluxMono() { 
    return Flux.just(this.getAllList()); 
} 

@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json") 
public Flux<List<ItemIgnite>> getAllFluxMono() { 
    return this.provider.getAllFluxMono(); 
} 

それが働いています。私は、すべてのデータが既に読み込まれて終了し、通常はフラックスを使用せずにデータを転送するので、クライアントに転送したと思います。

これらのデータを要求するウェブクライアントにデータをストリーミングするためには、どのような変更が必要ですか?

EDIT

私はignite cache内のデータを持っています。だから私のgetAllIteratorは発火キャッシュからデータをロードします

public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() { 
    return this.igniteCache.iterator(); 
} 

EDIT @Simonバーゼルのようなflux.complete()が提案追加

public Flux<ItemIgnite> getAllFlux() { 
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator(); 

    return Flux.create(flux -> { 
     while(iterator.hasNext()) { 
      flux.next(iterator.next().getValue()); 
     } 

     flux.complete(); // see here 
    }); 
} 

がブラウザで503問題を解決します。しかし、WebClientの問題は解決しません。まだデータは転送されていません。

EDIT Schedulers.parallel()publishOnを使用して3

public Flux<ItemIgnite> getAllFlux() { 
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator(); 

    return Flux.<ItemIgnite>create(flux -> { 
     while(iterator.hasNext()) { 
      flux.next(iterator.next().getValue()); 
     } 

     flux.complete(); 
    }).publishOn(Schedulers.parallel()); 
} 

が結果を変更しません。ここで

私は、Webクライアントが受け取る何を投稿:

value :[Item ID: null, Product Name: null, Product Group: null] 
complete 

彼は(35.000以上のうちの)1つの項目を取得していると値がnullであり、彼は後に仕上げているようなので、それはそう。

+0

'getAllIterator'が何をしているのか説明できますか?それはブロックされていますか?データベースからデータを読み込む?メモリから? –

答えて

4

あなたのcreateにはflux.complete()を決して呼ぶことはありません。

しかし、そこIterableFluxに変換するために調整された工場のオペレータは、実際にですので、あなただけのFlux.fromIterable(this)

編集を行うことができます場合は、あなたのIterator隠れている複雑さをDB要求(または任意のブロックIのように/ O)、publishOnを使用して専用の実行コンテキストで隔離されていない場合、リアクティブチェーンでブロックされるものは、チェーン全体だけでなく他のリアクティブプロセスもブロックする可能性があります複数の反応プロセスによって使用されます)。

createでもfromIterableも、特にブロックソースから保護するために何もしません。私はあなたがWebClientで取得したハングから判断すると、あなたはその種の問題に直面していると思います。

+0

'flux.complete()'を追加すると、ブラウザでテストしたときに期待される結果を得ることができます。私はもはや '503 'を手に入れません。 'WebClient'で試しても問題は解決しませんが、まだ結果はありません。 – Mulgard

+0

他に2つの問題があります。あなたの 'getAllIterator'はブロックされているので、スケジューラでスケジューリングする必要があります(' publishOn'のリアクタドキュメントを参照してください)。 WebClient側では、 'Flux'に対して' subscribe'を呼び出すだけでなく、それを返すようにしています。両方を行うべきではありません。 –

+0

私は自分の投稿を編集して、 'publishOn'をどのように追加したかを見せて、' WebClient'pに到着した結果を提供しました。 s。フラックスの購読は、出力が到着しているかどうかを確認することです。 – Mulgard

0

問題は私が転送したオブジェクトItemIgniteでした。システムFluxはこれを処理できないようです。元のコードを次のように変更した場合:

public Flux<String> getAllFlux() { 
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator(); 

    return Flux.create(flux -> { 
     while(iterator.hasNext()) { 
      flux.next(iterator.next().getValue().toString()); 
     } 
    }); 
} 

すべてうまくいきます。 publishOnなし、flux.complete()なし。たぶん、誰かがこれがなぜ機能しているのか考えているかもしれません。

関連する問題