2016-09-01 2 views
2

私はReactorプロジェクトを理解しようとしており、サブスクリプションを取り消す方法を探しています。 私は、Fluxのサブスクリプションを作成した後、onCancel Signalを送信するために使用できるCancellationオブジェクトへの参照を取得できることを知っていますが、これはサブスクリプションを作成した後であり、何らかのコレクションでその参照を保持する必要があります。反応炉をキャンセルするサブスクリプション

Cancellationオブジェクトを取得するより良い方法はありますか?または、購読をキャンセルするだけです。たぶん、すべてのアクティブなサブスクリプションへの参照を含む何らかの種類の - あなたが素晴らしいことになるええ...

答えて

2

subscribe()を呼び出す前に、リアクターでは、Subscriptionを取り消したいという意味はありませんメソッドを使用してSubscriptionを作成し、チェーンの上にその信号を伝播してデータの放出を開始します)。

すべてのサブスクリプションを持つ集中化された場所はありません。キャンセルする特定のサブスクリプションを見つける方法が必要なのであまり意味がありません(チェーン内の各オペレータが中間サブスクリプションも...)。

一部の事業者は、あなたの代わりにサブスクリプションをキャンセルすることがあります。

Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println); 

意志出力:

14:17:48.729 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | request(unbounded) 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(1) 
1 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | onNext(2) 
2 
14:17:48.732 [main] INFO reactor.Flux.Array.1 - | cancel() 
+0

をああ、私はその演算子については知りませんでした:それは十分なアイテムが放出された後、上流キャンセルさせていただきます例えばtake(int)用ケース、ある Oしかし、私のために最も重要なことは、CancellationオブジェクトがOnCancel信号を適切に処理できる信号を送信していたことでした。しかし、問題は回避策を見つけることができます。Fluxに例外を投げてストリームをキャンセルするので、悪いことはありません。 – Kapitalny

+1

'Cancellation'オブジェクトの使用をお勧めします。 3.1では 'Disposable'になります(その時点で' cancel() 'ではなく' dispose() 'を呼び出さなければなりません)。あなたがしたいことに自然にマッチする演算子を探したり、必要に応じてキャンセルしたりすることができます。流用に例外を投げることは、ユースケースに応じて、あまりにも良い解決法とは言えません。 –

+0

@SimonBasléあなたのコードを実行し、 'take(2)'が 'log()'の前にある場合、 'cancel()'シグナルは出力されません。どうして? 'take '演算子は、ソースではなく、フラックスをキャンセルすると言いました。逆に –

関連する問題