2016-11-02 3 views
1

私はRxJava初心者です。次の問題があります。 私はアイテムのシーケンスを持っていると、アイテムの上にエラーを伝播すると、それを無視して他のアイテムの処理を続行したいとします。私はスニペット以下のいるRxJavaでエラーが発生した後にアイテムをストリーミングする方法はありますか?

Observable.from(Arrays.asList("1", "2", "3")) 
      .map(x -> { 
       if (x.equals("2")) { 
        throw new NullPointerException(); 
       } 
       return x + "-"; 
      }) 
      .onExceptionResumeNext(Observable.empty()) 
      .subscribe(System.out::println); 

私は取得しています:1 -

しかし、私は取得したい:1、3-

私はそれをどのように行うことができますか?

+0

可能性のある重複した[エラーを無視して、無限ストリームを継続するには?](http://stackoverflow.com/questions/28969995/how-to-ignore-error-and-continue-infinite-stream) – Ivan

答えて

2

このトリックは、次の例のように、何らかの形で変換された値を新しい観測可能なフラットマップにラップすることです。 flatMapの各値は、例外をスローして、値で値を処理できるようになりました。 flatMapのサブストリームを1つの要素のみで構成すると、onErrorの後に観測対象が閉じられるかどうかは関係ありません。私はテスト環境としてRxJava2を使用します。

@Test 
public void name() throws Exception { 
    Observable<String> stringObservable = Observable.fromArray("1", "2", "3") 
      .flatMap(x -> { 
       return Observable.defer(() -> { 
        try { 
         if (x.equals("2")) { 
          throw new NullPointerException(); 
         } 
         return Observable.just(x + "-"); 
        } catch (Exception ex) { 
         return Observable.error(ex); 
        } 
       }).map(s -> { 
        if (s.equals("3-")) { 
         throw new IllegalArgumentException(); 
        } 
        return s + s; 
       }).take(1) 
         .zipWith(Observable.just("X"), (s, s2) -> s + s2) 
         .onErrorResumeNext(Observable.empty()); 
      }); 

    TestObserver<String> test = stringObservable.test(); 

    test.assertResult("1-1-X"); 
} 
+0

どうもありがとうございました! 私は別の質問があります: もし私が10個のトランスを持っていて、それぞれが例外を投げることができるなら、私はこのコードでそれぞれを包むべきですか? – corvax

+0

いつもの答えは:それはあなたがしたいことによって異なります。地図演算子だけでなく、新しいオブザーバブルを作成したい場合は、onErrorResumeNextの前に配置することをお勧めします。いずれかの演算子が失敗すると、値2(たとえば2)が失敗し、空の観測値が返されます。すべての演算子でバックアップ値を処理する場合は、サブストリームを新しいストリームに再ラップし、その上にフラット・マップします。いくつかのコードを追加してください。 –

関連する問題