2016-10-20 3 views
1

私はObservable.create()の素晴らしい代替品としてObservable.fromEmitter()を使っています。私は最近、いくつかの奇妙な行動に出くわしました。なぜこれが当てはまるのかよく分かりません。私はバックプレッシャーとスケジューラーに関する知識を持っている人に、これを見ていただき本当に感謝しています。RxJava Observable.fromEmitter奇妙な背圧の挙動

public final class EmitterTest { 
    public static void main(String[] args) { 
    Observable<Integer> obs = Observable.fromEmitter(emitter -> { 
     for (int i = 1; i < 1000; i++) { 
     if (i % 5 == 0) { 
      sleep(300L); 
     } 

     emitter.onNext(i); 
     } 

     emitter.onCompleted(); 
    }, Emitter.BackpressureMode.LATEST); 

    obs.subscribeOn(Schedulers.computation()) 
     .observeOn(Schedulers.computation()) 
     .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128" 

    sleep(10000L); 
    } 

    private static void sleep(Long duration) { 
    try { 
     Thread.sleep(duration); 
    } catch (InterruptedException e) { 
     throw new RuntimeException(e); 
    } 
    } 
} 

このアプリケーションの出力は

Received 1 
Received 2 
... 
Received 128 

である(これはRxJavaのデフォルトのバッファサイズであるため、仮に)そして、それは128に残ったまま。

fromEmitter()で指定されたモードをBackpressureMode.NONEに変更すると、コードは意図したとおりに動作します。 observeOn()へのコールを削除すると、意図したとおりに機能します。誰がなぜこれが当てはまるのか説明できますか?

+0

を使用することで、それがすべてで停止しないでください。 observeOnでtoBlocking()または小さいバッファサイズを使用しても停止します。私はこれをさらに調査します。 – akarnokd

+0

RX Java 2.0でObservable.fromEmitterと同等の機能は何ですか? – Mike6679

+1

'Observable.create()'は、2.0の 'fromEmitter()'を置き換えます。あなたが古い、恐ろしい作成動作をしたい場合は、Observable.unsafeCreate()を使用します。 –

答えて

2

これは同じプールのデッドロック状況です。 subscribeOnは、それが使用しているのと同じスレッド上のダウンストリームrequestをスケジュールしますが、そのスレッドがスリープ/エミッションループでビジー状態の場合、要求はfromEmitterに決して届かず、しばらくしてからLATESTは、メインソースが十分長く待っている場合は、最後の値(999)が配信されます。 (これはonBackpressureBlockと同様の状況です)

subscribeOnは、この要求のスケジューリングを行わなかった場合、この例は正しく機能します。

ソリューションを解決するためにan issueを開設しました。

今の回避策はobserveOnでより大きなバッファサイズを使用します(オーバーロードがあります)、またはこれが奇数であるfromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

+0

簡潔な答えをくれてありがとう、David!感謝します。 –

1

これは奇妙なことではありません。

コールをトレースしましょう。

Observable.subscribe(Subscriber<? super T> subscriber)

Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

など:で始まります。

OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):のコンストラクタに見て、あなたがバッファを指定しない場合

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { 
    this.scheduler = scheduler; 
    this.delayError = delayError; 
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; 
} 

、デフォルトはプラットフォームに依存しているサイズ、RxRingBuffer.SIZEです。

バッファサイズのないobserveOn演算子を呼び出すと、デフォルトは128(Androidでは16)になります。

この問題の解決方法は非常に簡単です。別のobserveOn演算子を使用し、バッファサイズを宣言するだけです。しかし、let(1000)(エミッタから来る要素の数)のバッファサイズを宣言すると、プログラムはすべての値(約170)を出さずに終了します。どうして?プログラムが終了するので。メインスレッドは10000秒後に終了し、計算は別のスレッド(Schedulers.computation())で実行されます。その解決策は? CountdownLatchを使用してください。プロダクションを決して使用しないように注意してください。

+0

答えをありがとう!私は最終的にDavidの答えを優先しましたが、コールを解体するために行った努力に感謝します。 –