2017-09-05 1 views
0

私はPublishProcessorという値を連続的に出力しています。そして私はPublishProcessorを2つの異なるスレッドで観察する2人の加入者で購読しました。第1の加入者が継続的にonNext(T)でデータを受信しながら、第2の加入者はonNext(T)RxJava2のPublishProcessor

以下

にいくつかの呼び出しを受けた後、エラーError: Could not emit value due to lack of requestsを投げ、私の実装です

PublishProcessor<byte[]> publishProcessor = PublishProcessor.create() 
dataFlowable.subscribeOn(Schedulers.newThread()).subscribe(publishProcessor); 

Subscriber1

publishProcessor.observeOn(Schedulers.newThread()) 
    .subscribeWith(new DisposableSubscriber<byte[]>() { 
     @Override public void onNext(byte[] bytes) { 
     //Log.i("Sub1 ", "Data received"); 
     } 

     @Override public void onError(Throwable t) { 

     } 

     @Override public void onComplete() { 
     Log.i("Record ", "complete"); 
     } 
    }) 

subscriber2など

publishProcessor.observeOn(Schedulers.newThread()) 
    .subscribeWith(new DisposableSubscriber<byte[]>() { 
     @Override public void onNext(byte[] moreData) { 
      Log.i("Sub2 ", "Data received"); 
     } 

     @Override public void onError(Throwable t) { 
     Log.i("Sub2 ", t.getMessage() + " "); // error received after few call to onNext() 
     } 

     @Override public void onComplete() { 
     Log.i("Sub2 ", "complete"); 
     } 
    }) 

答えて

3

これはMissingBackpressureExceptionです。発行者が消費者が消費できるよりも速く生産しているために発生しています。 PublishProcessorは、下流の加入者から上流のソースへの背圧を適用しません。

dataFlowableとは何ですか?なぜそれを直接購読しないのですか?

+0

私の要件は、 'dataFlowable'を私が直接使うことができる複数のサブスクライバで購読することです。だから私はここでPublishProcessorを使っている。 – arjun

+0

@arjun 'dataFlowable'がホットソースの場合は、複数のサブスクライバを直接追加することができます。コールドソースの場合は、1つの '.publish() - > ConnectedFlowable'にサブスクリプションを追加できます。 – ephemient

関連する問題