2017-03-10 1 views
0

私はリアクターライブラリーを試していましたが、onNextまたはonComplete呼び出しで戻ってくるモノが決して戻ってこない理由は分かりません。私は非常に些細な事を欠いていると思う。ここにサンプルコードがあります。購読中のモノでonNextとonCompleteコールを受け取ることができません

MyServiceService service = new MyServiceService(); 
    service.save("id") 
      .map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase())) 
      .subscribe(new Subscriber<MyUser>() { 
       @Override 
       public void onSubscribe(Subscription s) { 
        System.out.println("Subscribed!" + Thread.currentThread().getName()); 
       } 

       @Override 
       public void onNext(MyUser myUser) { 
        System.out.println("OnNext on thread " + Thread.currentThread().getName()); 

       } 

       @Override 
       public void onError(Throwable t) { 
        System.out.println("onError!" + Thread.currentThread().getName()); 

       } 

       @Override 
       public void onComplete() { 
        System.out.println("onCompleted!" + Thread.currentThread().getName()); 

       } 
      }); 


} 

private static class MyServiceService { 
    private Repository myRepo = new Repository(); 

    public Mono<MyUser> save(String userId) { 
     return myRepo.save(userId); 
    } 
} 

private static class Repository { 

    public Mono<MyUser> save(String userId) { 
     return Mono.create(myUserMonoSink -> { 
      Future<MyUser> submit = exe.submit(() -> this.blockingMethod(userId)); 
      ListenableFuture<MyUser> myUserListenableFuture = JdkFutureAdapters.listenInPoolThread(submit); 
      Futures.addCallback(myUserListenableFuture, new FutureCallback<MyUser>() { 
       @Override 
       public void onSuccess(MyUser result) { 
        myUserMonoSink.success(result); 
       } 

       @Override 
       public void onFailure(Throwable t) { 
        myUserMonoSink.error(t); 
       } 
      }); 
     }); 
    } 

    private MyUser blockingMethod(String userId) throws InterruptedException { 
     Thread.sleep(5000); 
     return new MyUser("blocking", userId); 
    } 
} 

上記のコードのみがSubcribed!mainを出力します。その将来のコールバックがmyUserMonoSink.success

答えて

2

までの値をプッシュされていない理由を私は何を把握することはできないんだと、心に留めておくべき重要なことは、FluxまたはMono非同期、ほとんどの時間であるということです。

サブスクライブすると、ユーザーを保存する非同期処理がエグゼキュータで開始されますが、.subscribe(...)以降のメインコードで実行が継続されます。

したがって、mainスレッドが終了すると、何かが起こる前にテストが終了し、Monoにプッシュされました。

[サイドバー]:同期はいつですか?

データのソースがFlux/Mono同期ファクトリメソッドの場合。しかし、演算子のチェーンの残りの部分は実行コンテキストを切り替えないという前提条件が追加されています。これは明示的に(publishOnまたはsubscribeOn演算子を使用する)、または暗黙的に(例えばdelayElementsのような時間関連の演算子のような演算子は、別のSchedulerで実行されます)発生する可能性があります。

単純に言えば、ソースはExecutorServiceスレッドexeで実行されているため、実際にはMonoは非同期です。あなたのスニペットはmainです。 (生産の完全非同期コードではなく)実験でMonoの正しい動作を観察するには問題

を修正する方法

、いくつかの可能性が用意されています

  • がでsubscribeを保ちますsystem.out.printlnsを入力しますが、.countDown()onCompleteonErrorの中にnew CountDownLatch(1)を追加します。 subscribeの後のカウントダウンラッチのawait
  • .subscribe(...)の代わりに.log().block()を使用してください。各イベントで何をすべきかをカスタマイズすることはできませんが、log()はそれらを印刷します(ログフレームワークが設定されている場合)。 block()はブロッキングモードに戻り、前述のCountDownLatchで提案したこととほとんど同じです。使用可能になった時点で値を返します。エラーが発生した場合はExceptionを返します。
  • 代わりのlog()あなたが.doOnXXX(...)メソッドを使用してロギングまたは他の副作用をカスタマイズすることができます(イベント、例えばのイベント+組み合わせのほとんどすべてのタイプごとに1つがあります。doOnSubscribedoOnNext ...)

あなたが」ユニットテストを行っている場合は、reactor-testsプロジェクトのStepVerifierを使用してください。フラックス/モノに加入し、.verify()に電話するとイベントを待ちます。参照ガイドchapter on testing(および一般的なリファレンスガイドの残りの部分)を参照してください。

1

問題は、作成された匿名クラスonSubscribeメソッドは何もしません。 LambdaSubscriberの実装を見ると、いくつかのイベントが要求されます。 BaseSubscriberには、定義済みのロジックがいくつかあるので、拡張する方が簡単です。

だからあなたの加入者の実装は次のようになります。

MyServiceService service = new MyServiceService(); 
service.save("id") 
     .map(myUserMono -> new MyUser(myUserMono.getName().toUpperCase(), myUserMono.getId().toUpperCase())) 
     .subscribe(new BaseSubscriber<MyUser>() { 
       @Override 
       protected void hookOnSubscribe(Subscription subscription) { 
        System.out.println("Subscribed!" + Thread.currentThread().getName()); 
        request(1); // or requestUnbounded(); 
       } 

       @Override 
       protected void hookOnNext(MyUser myUser) { 
        System.out.println("OnNext on thread " + Thread.currentThread().getName()); 

        // request(1); // if wasn't called requestUnbounded() 2 
       } 

       @Override 
       protected void hookOnComplete() { 
        System.out.println("onCompleted!" + Thread.currentThread().getName()); 
       } 

       @Override 
       protected void hookOnError(Throwable throwable) { 
        System.out.println("onError!" + Thread.currentThread().getName()); 
       } 

      }); 

は、多分それは最適な実装ではありません、私はあまりにも反応器に新たなんです。

サイモンの答えは、非同期コードのテストについてはかなり良い説明です。

関連する問題