2016-11-13 9 views
0

RxJava 2.0で使用したいorg.reactivestreams.Processorがあります。しかし、org.reactivestreams.Publisherio.reactivex.Flowable#fromPublisherのようにRxJavaに統合するための変換はありますが、org.reactivestreams.Processor(またはorg.reactivestreams.Subscriber)を最適に統合する方法はわかりません。誰でもこの光を照らすことができますか?RxJava 2.0でReactive-Streamsプロセッサを使用

答えて

0

あなたはPublisher側をラップしているようSubscriber側を保つ:

Processor proc = ... 

Subscriber sub = proc; 
Flowable flow = Flowable.fromPublisher(proc); 

flow.map(v -> v.toString()).subscribe(System.out::println); 

sub.onNext(1); 
+0

うーん、しかし、私は 'onNext'を経由を要求し、より頻繁に呼び出されてはならないことを指定し、反応ストリーム契約に違反する可能性があります'サブスクリプション#要求(長い)'。 –

+0

これは、そのプロセッサーを入手した場所、またはダウンストリーム要求を調整するかどうかによって異なります。 RxJavaのプロセッサは調整されておらず、サブスクリプションを送信すると、常にLong.MAX_VALUEを要求します。 – akarnokd

+0

プロセッサは[spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/README.md#specification)に確認し、akkaストリームと正常に統合されました。私はRxJava 1.xとは異なり、RxJava 2.0がバックプレッシャーをサポートしていると考えていました。 –

関連する問題