0

私はこのようなデータの流れがあります。RxJava2で例外をスキップする方法はありますか?

Observable 
    .fromFuture(
     CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>> 
      listOf(1, 2, 3, 57005, 5) 
     }, 
     Schedulers.computation() 
    ) 
    .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one 
    .map { 
     CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred. 
      if (it == 0xDEAD) { 
       throw IOException("Dead value!") 
      } 

      it 
     } 
    } 
    .flatMap { 
     Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again 
    } 
    .doOnNext { 
     println(it) // Debug 
    } 
    .blockingSubscribe() 

を私はCompletableFuture.supplyAsyncで(実際にFuture秒を返すことを)ビジネスロジックを交換しました。 そして、はい、これはコトリンですが、あなたはその意図を持っていると思います。

私はコメント

「死んだ」の値( 57005)出力は次のようになります。

1 
4 
9 
25 

しかし、その「死者」の値がストリームに表示されている場合、それは失敗します。

1 
4 
9 
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! 
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45) 
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86) 
    at io.reactivex.Observable.blockingSubscribe(Observable.java:5035) 
    at by.dev.madhead.rx.TestKt.main(test.kt:41) 
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value! 
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
... 

アイムRXの初心者で、すばやく解決策を探そうとした:onExceptionResumeNextObservable.fromFuture(it) - >Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }しかし今、私のアプリケーションは永遠に(私が期待する出力を生成した後)ハングします。 ストリームが終了しないように見えます。

私はシャットダウンする必要がありますか?Observable何とか何ですか? または、より一般的には、RXで作業するときは良いアプローチですか? 別の方法でそれを再考する必要がありますか?このような

答えて

1

ツバメ例外:

Observable.fromFuture(it).onErrorResumeNext(Observable.empty()) 
+0

実際に私の愚かな、トリックをしました。ありがとう! – madhead

関連する問題