0
私はPublishProcessor
という値を連続的に出力しています。そして私はPublishProcessor
を2つの異なるスレッドで観察する2人の加入者で購読しました。第1の加入者が継続的にonNext(T)
でデータを受信しながら、第2の加入者はonNext(T)
RxJava2のPublishProcessor
にいくつかの呼び出しを受けた後、エラーError: Could not emit value due to lack of requests
を投げ、私の実装です
PublishProcessor<byte[]> publishProcessor = PublishProcessor.create()
dataFlowable.subscribeOn(Schedulers.newThread()).subscribe(publishProcessor);
Subscriber1
publishProcessor.observeOn(Schedulers.newThread())
.subscribeWith(new DisposableSubscriber<byte[]>() {
@Override public void onNext(byte[] bytes) {
//Log.i("Sub1 ", "Data received");
}
@Override public void onError(Throwable t) {
}
@Override public void onComplete() {
Log.i("Record ", "complete");
}
})
subscriber2など
publishProcessor.observeOn(Schedulers.newThread())
.subscribeWith(new DisposableSubscriber<byte[]>() {
@Override public void onNext(byte[] moreData) {
Log.i("Sub2 ", "Data received");
}
@Override public void onError(Throwable t) {
Log.i("Sub2 ", t.getMessage() + " "); // error received after few call to onNext()
}
@Override public void onComplete() {
Log.i("Sub2 ", "complete");
}
})
私の要件は、 'dataFlowable'を私が直接使うことができる複数のサブスクライバで購読することです。だから私はここでPublishProcessorを使っている。 – arjun
@arjun 'dataFlowable'がホットソースの場合は、複数のサブスクライバを直接追加することができます。コールドソースの場合は、1つの '.publish() - > ConnectedFlowable'にサブスクリプションを追加できます。 – ephemient