2017-01-22 9 views
0

おそらく私はsubscribeOnobserveOnの内部動作を理解しているかもしれませんが、最近私は本当に奇妙なことに遭遇しました。私は、subscribeOnが最初に処理を開始するスケジューラを決定しているという印象を受けました。特にmapが多くあり、observeOnはスケジューラを変更するときにmapsのどこでも使用できます最初にネットワーキングを行い、計算し、最後にUIスレッドを変更します。RxJava:subscribeOnとobserveOnが期待通りに動作しない

しかし、これらの呼び出しをObservableまたはSingleに直接連結しないと、動作しません。ここでは、最小限の作業例JUnitのテストです:

import org.junit.Test; 
import rx.Single; 
import rx.schedulers.Schedulers; 

public class SubscribeOnTest { 

    @Test public void not_working_as_expected() throws Exception { 
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> { 
     System.out.println("Doing some computation on thread " + Thread.currentThread().getName()); 
     int i = 1; 
     singleSubscriber.onSuccess(i); 
    }); 
    single.subscribeOn(Schedulers.computation()).observeOn(Schedulers.io()); 

    single.subscribe(integer -> { 
     System.out.println("Observing on thread " + Thread.currentThread().getName()); 
    }); 
    System.out.println("Doing test on thread " + Thread.currentThread().getName()); 
    Thread.sleep(1000); 
    } 

    @Test public void working_as_expected() throws Exception { 
    Single<Integer> single = Single.<Integer>create(singleSubscriber -> { 
     System.out.println("Doing some computation on thread " + Thread.currentThread().getName()); 
     int i = 1; 
     singleSubscriber.onSuccess(i); 
    }).subscribeOn(Schedulers.computation()).observeOn(Schedulers.io()); 

    single.subscribe(integer -> { 
     System.out.println("Observing on thread " + Thread.currentThread().getName()); 
    }); 
    System.out.println("Doing test on thread " + Thread.currentThread().getName()); 
    Thread.sleep(1000); 
    } 
} 

テストnot_working_as_expected()working_as_expected()のに対し、出力

Doing some computation on thread main 
Observing on thread main 
Doing test on thread main 

を以下の私を与え

Doing some computation on thread RxComputationScheduler-1 
Doing test on thread main 
Observing on thread RxIoScheduler-2 

私にその最初のテストで唯一の違いを

を与え、単一の作成後にセミコロンがあり、それからスケジューラが適用され、実際の例ではメソッドコールはdirecですSingleの創造に結びついています。しかし、それは無関係ではないでしょうか?

+3

これは非常によくある間違いです。各演算子は新しいオブジェクトを返します。 subscribeOn + observeOnを適用し、返されたSingleを無視し、元の変更されていないソースにサブスクライブします。 – akarnokd

答えて

1

オペレータによって実行されるすべての「修正」は、彼らが以前のものから変更された方法で通知を受け取る新しいストリームを返すことを意味し、不変です。 subscribeOnobserveOnの演算子を呼び出しただけで結果が保存されていないため、後で行われるサブスクリプションは変更されていないストリームにあります。

片面メモ:私はsubscribeOnのあなたの定義を理解していませんでした。マップ演算子が何らかの影響を受けることを意味した場合、これは当てはまりません。 subscribeOnは、OnSubscribe関数が呼び出されるSchedulerを定義します。あなたの場合、関数はcreate()メソッドに渡します。他方、observeOnは、各連続ストリーム(適用されたオペレータによって返されるストリーム)が上流から来る排出物を処理するスケジューラを定義する。

2

.subscribeOn(*) - 新しいインスタンスObservableが返されますが、最初のテストでは無視して元のObservableにサブスクライブします。デフォルトでは、デフォルトでメインスレッドにサブスクライブされています。

関連する問題