私はObservable.create()
の素晴らしい代替品としてObservable.fromEmitter()
を使っています。私は最近、いくつかの奇妙な行動に出くわしました。なぜこれが当てはまるのかよく分かりません。私はバックプレッシャーとスケジューラーに関する知識を持っている人に、これを見ていただき本当に感謝しています。RxJava Observable.fromEmitter奇妙な背圧の挙動
public final class EmitterTest {
public static void main(String[] args) {
Observable<Integer> obs = Observable.fromEmitter(emitter -> {
for (int i = 1; i < 1000; i++) {
if (i % 5 == 0) {
sleep(300L);
}
emitter.onNext(i);
}
emitter.onCompleted();
}, Emitter.BackpressureMode.LATEST);
obs.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"
sleep(10000L);
}
private static void sleep(Long duration) {
try {
Thread.sleep(duration);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
このアプリケーションの出力は
Received 1
Received 2
...
Received 128
である(これはRxJavaのデフォルトのバッファサイズであるため、仮に)そして、それは128に残ったまま。
fromEmitter()
で指定されたモードをBackpressureMode.NONE
に変更すると、コードは意図したとおりに動作します。 observeOn()
へのコールを削除すると、意図したとおりに機能します。誰がなぜこれが当てはまるのか説明できますか?
を使用することで、それがすべてで停止しないでください。 observeOnでtoBlocking()または小さいバッファサイズを使用しても停止します。私はこれをさらに調査します。 – akarnokd
RX Java 2.0でObservable.fromEmitterと同等の機能は何ですか? – Mike6679
'Observable.create()'は、2.0の 'fromEmitter()'を置き換えます。あなたが古い、恐ろしい作成動作をしたい場合は、Observable.unsafeCreate()を使用します。 –