2016-06-16 10 views
1

私はrx javaが新しく、おそらく非常に基本的なものがありません。以下のサンプルコードでは、私が起こることをようを何いただきたいです:rx java異なる複数のスレッドでobservablesを実行する

  1. SampleControllerは

  2. CompositeService処理は、新しいスレッドAとリリースNIO要求上で動作するHTTP-NIOスレッドで要求を受け取ります糸。

  3. CompositeServiceは、スレッドBにネットワーク呼び出しを行うHelloServiceを呼び出す

  4. CompositeServiceは場合

3,4ラン並行スレッドC.上のネットワーク呼び出しを行うWorldService、およびを呼び出します結果は準備ができていますが、結果を使ってスレッドAでネットワークコールを行います。

代わりに、3と4がhttp-nioスレッドで順番に実行され、CompositeServiceだけが実行されます。 sを新しいスレッドに追加します。 3と4の私のsubscribeOnコールは効果がないようです。 3と4を同時に実行するにはどうすればよいですか?

SampleController

@RestController 
@RequestMapping("/rx-java-sample") 
public class SampleController { 

    private static Logger log = LoggerFactory.getLogger(SampleController.class); 

    @Autowired 
    private CompositeService compositeService; 

    @RequestMapping(method = RequestMethod.GET, 
     produces = { MediaType.APPLICATION_JSON_VALUE }) 
    public DeferredResult<String> getCompositeString() 
        throws ApiGatewayException, ApiValidationException { 
     log.info("Received getCompositeObject request"); 

     Observable<String> compositeObject = compositeService.getCompositeString(); 

     return toDeferredResult(compositeObject); 
    } 

    private DeferredResult<String> toDeferredResult(Observable<String> compositeObject) { 
     DeferredResult<String> result = new DeferredResult<String>(); 

     compositeObject.subscribeOn(Schedulers.newThread()).subscribe(new Observer<String>() { 
      @Override 
      public void onCompleted() { 
      } 

      @Override 
      public void onError(Throwable throwable) { 
       result.setErrorResult(throwable); 
      } 

      @Override 
      public void onNext(String compositeString) { 
       log.info("Returning compositeObject: " + compositeString); 
       result.setResult(compositeString); 
      } 
     }); 

     return result; 
    } 
} 

HelloService

@Service 
public class HelloService { 

    private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getHello() { 
     log.trace("calling get hello"); 
     return Observable.just(makeNetworkCall()); 
    } 

    private String makeNetworkCall() { 
     log.trace("making hello network call"); 
     return "hello"; 
    } 
} 

WorldService

@Service 
public class WorldService { 

private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getWorld() { 
     log.trace("calling get world"); 
     return Observable.just(makeNetworkCall()); 
    } 

    private String makeNetworkCall() { 
     log.trace("making world network call"); 
     return "world"; 
    } 
} 

CompositeService

@Service 
public class CompositeService { 

    private Logger log = LoggerFactory.getLogger(CompositeService.class); 

    @Autowired 
    private HelloService helloService; 

    @Autowired 
    private WorldService worldService; 

    public Observable<String> getCompositeString() { 
     log.trace("Calling getCompositeObject"); 

     Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread()); 
     Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread()); 

     return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f,b)); 
    } 

    private String makeNetworkCall(String hello, String world) { 
     log.trace("making composite network call"); 
     return hello + " " + world; 
    } 
} 

ログ

2016-06-16 07:10:13 INFO [http-nio-9050-exec-1] [SampleController.java:32] Received getCompositeObject request 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [CompositeService.java:23] Calling getCompositeObject 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:15] calling get hello 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:20] making hello network call 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:15] calling get world 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:20] making world network call 
2016-06-16 07:10:13 TRACE [RxNewThreadScheduler-3] [CompositeService.java:32] making composite network call 
2016-06-16 07:10:13 INFO [RxNewThreadScheduler-3] [SampleController.java:54] Returning compositeObject: hello world 
+0

fyiでは、CPUバインドされた作業のためのSchedulers.computation()があり、Scheduler.newThread()よりもioバインドのためのSchedulers.io()があります。 –

答えて

1

あなたはこれらのケースのためObservable::deferを使用したい:

public Observable<String> getWorld() { 
     log.trace("calling get world"); 

     return Observable.defer(() -> makeNetworkCall()); 
    } 

これはあなたのコードは呼び出されることを観察できるが上subsribedされることを各時間を保証します。

また、Schedulers.io()を使用することをおすすめします。デフォルトでは必要に応じて展開される構成可能なスレッドプールです。

0

問題がObservable.justではなくObservable.fromCallableの私の使用した表示されます。上記のサービスコードを以下のように変更すると、私が探していた動作が生成されました。私はこれを行うための "推奨された"方法であるかどうかに関して、まだフィードバックを探しています。特に、CompositeServiceのtoBlocking()の使用が正しいかどうかはわかりません。私はおそらくこのパターンを自分のコードで広範囲に使用するでしょうし、それを正しいものにしたいと思います。

HelloService

@Service 
public class HelloService { 

    private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getHello() { 
     log.trace("calling get hello"); 

     return Observable.fromCallable(() -> { 
      return makeNetworkCall(); 
     }); 
    } 

    private String makeNetworkCall() { 
     log.trace("making hello network call"); 
     return "hello"; 
    } 
} 

WorldService

@Service 
public class WorldService { 

private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getWorld() { 
     log.trace("calling get world"); 

     return Observable.fromCallable(() -> { 
      return makeNetworkCall(); 
     }); 
    } 

    private String makeNetworkCall() { 
     log.trace("making world network call"); 
     return "world"; 
    } 
} 

CompositeService

@Service 
public class CompositeService { 

    private Logger log = LoggerFactory.getLogger(CompositeService.class); 

    @Autowired 
    private HelloService helloService; 

    @Autowired 
    private WorldService worldService; 

    public Observable<String> getCompositeString() { 
     return Observable.fromCallable(() -> { 
      return getCompositeStringImpl().toBlocking().single(); 
     }); 
    } 

    public Observable<String> getCompositeStringImpl() { 
     log.trace("Calling getCompositeObject"); 

     Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread()); 
     Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread()); 

     return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f, b)); 
    } 

    private String makeNetworkCall(String hello, String world) { 
     log.trace("making composite network call"); 
     return hello + " " + world; 
    } 
} 

ログ

2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [CompositeService.java:29] Calling getCompositeObject 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [HelloService.java:15] calling get hello 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [WorldService.java:15] calling get world 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-2] [HelloService.java:23] making hello network call 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [WorldService.java:23] making world network call 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [CompositeService.java:38] making composite network call 
2016-06-16 08:15:50 INFO [RxNewThreadScheduler-1] [SampleController.java:54] Returning compositeObject: hello world 
関連する問題