2017-02-28 54 views
0

私はかなり基本的な質問であると私はしばらくの間苦労してきました。観測可能なサポートレスキュープル

私はFlowableを持っています。これは、ネットワークからアイテムの束を取り出し、それらを放出します。

Flowable 
    .create(new FlowableOnSubscribe<Item>() { 
     @Override 
     public void subscribe(FlowableEmitter<Item> emitter) throws Exception { 
      Item item = retrieveNext(); 

      while (item != null) { 
       emitter.onNext(item); 

       if (item.isLast) { 
        break; 
       } 

       item = retrieveNext(); 
      } 

      emitter.onComplete(); 
     } 
    }, BackpressureStrategy.BUFFER) 
    .doOnRequest(new LongConsumer() { 
     @Override 
     public void accept(long t) throws Exception { 
      // ... 
     } 
    }); 

リクエストごとに処理する必要があります。つまり、私はSubscriptionを保有し、必要に応じてsubscription.request(n)と呼んでいます。

Backpressure(「リアクティブプル」セクション)の記事はObserverの視点を記述するだけで簡単に正しく[反応プル]が機能するためには

が、しかし、観測A及びBは に応答しなければならないことに言及しますリクエスト()< ...>そのようなサポートは、それは、そのようなサポートは原則的に達成することができる方法に関する詳細には触れません 観測

の要件ではありません。

このような実装には共通のパターンがありますかObservable/Flowable?私は良い例や参考資料を見つけられませんでした。私が考えることができる1つの貧弱なオプションは、私がループの中でロックするモニターを持つことです。emitter.requested() == 0と別のスレッドからdoOnRequest()のロックを解除するとループします。しかし、このアプローチはちょっと面倒です。

この「減速」は一般にどのように達成されますか?

ありがとうございました。

+0

明確にするために、2.xの演算子は[wiki](https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0)に記述されています。 – akarnokd

答えて

2

使用Flowable.generate

Flowable.generate(
() -> generateItemRetriever(), 
    (Retriever<Item> retriever, Emitter<Item> emitter) -> { 
    Item item = retriever.retrieveNext(); 
    if(item!=null && !item.isLast()) { 
     emitter.onNext(item); 
    } else { 
     emitter.onComplete(); 
    } 
    } 
) 

Thgeneratedフロアブルは、背圧を意識し、退会認識されます。 Retrieverに廃棄する必要のある状態がある場合は、3つの引数generateのオーバーロードを使用します。

しかし、HTTPコールを行っているなら、RxJavaとよく似合うRetrofitのようなものを検討することをお勧めします。

関連する問題