2017-10-06 2 views
0

新しいスレッドを生成することによって、ネットワーキングの仕事をするレガシーなRXコードがあります。 作業が終了すると、コールバックで1つのメソッドが呼び出されます。RXに対応したコードは、onSubscribeスレッドをブロックしません。

私はこのコードが実行されているスレッドを制御できません。それは遺産であり、それは新しいThreadを単独で生み出します。

これは次のように単純化することができます。

interface Callback { 
    void onSuccess(); 
} 

static void executeRequest(String name, Callback callback) { 
    new Thread(() -> { 
     try { 
      System.out.println(" Starting... " + name); 
      Thread.sleep(2000); 
      System.out.println(" Finishing... " + name); 
      callback.onSuccess(); 
     } catch (InterruptedException ignored) {} 
    }).start(); 
} 

私はRX Completableにこれを変換したいです。これを行うには、Completable#create()を使用します。 CompletableEmitterの実装は、要求が完了したときに通知するCallbackの実装のexecuteRequestの実装を呼び出します。

私はデバッグに役立つために購読するとログトレースを出力します。

static Completable createRequestCompletable(String name) { 
     return Completable.create(e -> executeRequest(name, e::onComplete)) 
       .doOnSubscribe(d -> System.out.println("Subscribed to " + name)); 
} 

期待どおりに動作します。 Completableは、「要求」が終了してコールバックが呼び出された後にのみ完了します。

trampolineスケジューラでこれらの完了コードを購読すると、最初の要求が完了してから2番目の要求を購読するまで待たないという問題があります。

このコード:

final Completable c1 = createRequestCompletable("1"); 
c1.subscribeOn(Schedulers.trampoline()).subscribe(); 

final Completable c2 = createRequestCompletable("2"); 
c2.subscribeOn(Schedulers.trampoline()).subscribe(); 

出力:

Subscribed to 1 
    Starting... 1 
Subscribed to 2 
    Starting... 2 
    Finishing... 1 
    Finishing... 2 

ご覧のように、第1 Completableは私がtrampolineに加入していた場合でも、完了する前に、それは第二Completableに加入しています。

私が出力する、最初のための第二待機が終了するように、completablesをキューしたいと思い、この:

Subscribed to 1 
    Starting... 1 
    Finishing... 1 
Subscribed to 2 
    Starting... 2 
    Finishing... 2 

私は労働者で行われている問題は、仕事に関連していると確信しています糸。 Completableの実装が新しいスレッドを生成しない場合、期待どおりに動作します。 しかし、これはレガシーコードであり、私がしようとしているのは変更せずにRXに適合させることです。

注:要求はプログラムの異なるポイントで実行されます。andThenまたはconcatを使用してシリアル化された実行を実装することはできません。

答えて

0

LatchでサブスクリプションThreadを明示的にブロックすることによって、Completablesを順番に実行することができました。 しかし、これはRXでそれを行う慣用的な方法ではないと私は考えています。なぜこれを行う必要があるのか​​理解できず、Completableが完了するまでスレッドはブロックされません。

static Completable createRequestCompletable(String name) { 
    final CountDownLatch latch = new CountDownLatch(1); 
    return Completable.create(e -> { 
     executeRequest(name,() -> { 
      e.onComplete(); 
      latch.countDown(); 
     }); 
     latch.await(); 
    }) 
    .doOnSubscribe(disposable -> System.out.println("Subscribed to " + name)); 
} 
関連する問題