2016-12-27 1 views
0

エラーがスローされ、置き換えられます。しかし、実行は終了します。どのように10要素を放出するために観測可能にするには?エラーがスローされても値を放出し続けるように観測可能にする方法

const Rx = require('rxjs/Rx') 

Rx.Observable.interval(1000) 
    .map((i) => { 
    if (i === 2) throw(new Error('omg')) 
    return i 
    }) 
    .take(10) 
    .catch((err) => { 
    return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit') 
    }) 
    .do(console.log, console.error) 
    .subscribe() 

答えて

0

私はRxJSの達人ではありませんが、私はこれに答えてみたいと思います。

RxJSでエラーをスローすると、オブザーバブルが終了します。結果として、再開することはできませんが、オブザーバブルの再試行/再試行のみを試みることができます。

エラーを再生したくないだけのオリジナル10個の要素を取る必要がある場合は、あなたでしreturn null代わりのthrow new Errorとちょうどfilter(x => x)ますtake(10)前。

それ以外の場合は、retryWhenを使用してエラー時にobservableを繰り返すことができます。これは2つの項目を取り、失敗し、0、1、...で繰り返すことから始まることに注意してください。2が失敗した後に消えますが、それでも10の項目しか必要としません。

Rx.Observable.interval(1000) 
    .map((i) => { 
    if (i === 2) throw(new Error('omg')) 
    return i 
    }) 
    .retryWhen((errors) => errors.scan(
    (errorCount, err) => { 
     if(errorCount >= 2) { 
      throw err; 
     } 

     return errorCount + 1; 
    }, 0) 
) 
    .take(10) 
    .catch((err) => { 
    return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit') 
    }) 
    .subscribe((x) => console.log('result', x)) 

また、単にいつでも観察可能な仕上げを繰り返し続けるためにrepeatを使用することができます。これはあなたが望むものではない可能性が高いですが、私はオプションとしてあなたにそれを見せたいと思っていました。あなたが取った場所に注意を払う必要があります。また、放出されたキャッチを観測することもできます。

Rx.Observable.interval(1000) 
    .map((i) => { 
    if (i === 2) throw(new Error('omg')) 
    return i 
    }) 
    .catch((err) => { 
    return Rx.Observable.of('ok, we caught an error, but we don\'t want to exit') 
    }) 
    .repeat() 
    .take(10) 
    .subscribe((x) => console.log('result', x)) 
0

観察可能な契約はOnNext*(OnError|OnCompleted)+です。

シーケンスが終了したら、下流のオペレータは登録を解除する必要があります。パイプラインにのみ再登録することができます。あなたの観測値が寒い場合は、retry演算子を使用して再登録することができます。

observable 
.retry() 
.take(10) 
.subscribe() 
0

エラーを処理してオブザーバブルを返す関数を提供できます。高次関数を使用するため、代わりにflatMapを使用する必要があります。

function handleError(cb){ 
    return (val) => { 
     try{ 
      return Rx.Observable.of(cb(val)); 
     }catch(err){ 
      console.error(`${err}`); 
      return Rx.Observable.empty(); 
     } 
    } 
} 

Rx.Observable.interval(1000) 
    .flatMap(handleError(i => { 
     if (i === 2) throw(new Error('omg')) 
     return i; 
    })) 
    .take(10) 
    .do(console.log) 
    .subscribe() 

// emits 
// 0 
// 1 
// "Error: omg" 
// 3 
// 4 
// 5 
// 6 
// 7 
// 8 
// 9 
// 10 

jsbin example

かなりあなたの例のコードに合うが、言及する価値があるしない別の例として、RxJSの主任開発者、ベン・レッシュは、On The Subject Of Subjects (in RxJS)と呼ばれるポストでこの問題に触れました。 「Gotchas in RxJS」と呼ばれる途中のセクションがあります。

[...] Rx観測値はエラーを「トラップ」しないので、ここでは何か という奇妙な動作が発生します。エラー "トラップ"は自分自身の振る舞いです derided実装の約束ですが、マルチキャストシナリオでは が正しい動きになるかもしれません。私がRx observableと言ったときに意味するのは "トラップ"エラーは基本的にエラーが の最後にパーコレートしてオブザーバーチェーンに入り、エラーが処理されないと再スローされます。ここ

そのセクション(最も単純なほとんどパフォーマンスせず)からの1つのコードの例である:

const source$ = Observable.interval(1000) 
    .share() 
    .observeOn(Rx.Scheduler.asap); // magic here 
const mapped$ = source$.map(x => { 
    if (x === 1) { 
    throw new Error('oops'); 
    } 
    return x; 
}); 
source$.subscribe(x => console.log('A', x)); 
mapped$.subscribe(x => console.log('B', x)); 
source$.subscribe(x => console.log('C', x)); 
// "A" 0 
// "B" 0 
// "C" 0 
// "A" 1 
// Uncaught Error: "oops" 
// "C" 1 
// "A" 2 
// "C" 2 
// "A" 3 
// "C" 3 
// ... etc 

jsbin example

関連する問題