2016-10-17 3 views
1

現在、オブザーバブルを一時停止する方法を見つけようとしています。 github https://github.com/ReactiveX/rxjs/issues/1542の肉のポストを見た後、私は正しい道にいると思う。しかし、何らかの理由で私の観察者のtakeWhile()は、私のpauser SubjectからswitchMap()を実行すると無視されています。mergeMap()を対象に使用すると、マージしているオブザーバブルのtakeWhile()が無視されます

これは正しく終了:この方法で呼び出されたときに

export class CompositionService { 
    cursor = -1; 
    pauser = new Subject(); 
    interval; 
    init = (slides) => { 
     let waitUntil = 0; 
     return this.interval = Observable 
      .range(0, slides.length) 
      .mergeMap((i) => { 
       let next = Observable.of(i).delay(waitUntil); 
       waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0; 
       return next; 
      }) 
      .scan((cursor) => { 
       return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1; 
      }, this.cursor) 
      .map(cursor => slides[cursor]) 
      .takeWhile((slide) => { 
       return !!slide; 
      }); 

    }; 
    // these methods are not called for this sample 
    play =() => { 
     this.pauser.next(false); 
    }; 
    pause =() => { 
     this.pauser.next(true); 
    }; 
}; 

は、これは動作します:

it("should subscribe to init", (done) => { 
    slides.forEach((slide, i) => { 
     if (slide.duration) { 
      slide.duration = slide.duration/100; 
     } 
    }); 
    composition.init(slides).subscribe(
     (slide) => { 
      console.log(slide); 
     }, 
     (err) => { 
      console.log("Error: " + err); 
     }, 
     () => { 
      done(); 
     }); 
}); 

宣伝として、前の例では動作しますが、私はいくつかの「魔法」を追加したときに間隔オブザーバーは決して終わりません。

export class CompositionService2 { 
    cursor = -1; 
    pauser = new Subject(); 
    interval; 
    init = (slides) => { 
     let waitUntil = 0; 
     this.interval = Observable 
      .range(0, slides.length) 
      .mergeMap((i) => { 
       let next = Observable.of(i).delay(waitUntil); 
       waitUntil += !!slides[i]["duration"] ? slides[i]["duration"] : 0; 
       return next; 
      }) 
      .scan((cursor) => { 
       return this.cursor = cursor = slides[cursor + 1] ? cursor + 1 : -1; 
      }, this.cursor) 
      .map(cursor => slides[cursor]) 
      .takeWhile((slide) => { 
       return !!slide; 
      }); 
     return this.pauser 
      // leaving commented for clarity of the end game 
      // .switchMap(paused => paused ? Observable.never() : this.interval); 
      // however, not even a straight forward switchMap is yeilding the expected results 
      .switchMap(paused => this.interval);    
    }; 
    play =() => { 
     this.pauser.next(false); 
    }; 
    pause =() => { 
     this.pauser.next(true); 
    }; 
}; 

it("should subscribe to init", (done) => { 
    slides.forEach((slide, i) => { 
     if (slide.duration) { 
      slide.duration = slide.duration/100; 
     } 
    }); 
    composition.init(slides).subscribe(
     (slide) => { 
      console.log(slide); 
     }, 
     (err) => { 
      console.log("Error: " + err); 
     }, 
     () => { 
      //I never get here!!!!! 
      done(); 
     }); 
    // kickstart my heart! 
    composition.play(); 
}); 

誰でも私がここで間違っていることは何ですか?

答えて

1

外部ストリームを完了していません。最初のバージョンでは、takeWhileがストリームを完成するときに完了しています。しかし、一度あなたはswitchMapの中にそれを入れ子にします。外側のストリーム(Subject)は決して完了しないので、内部ストリームを完了することは今までしかありません。これが平坦化されると、加入者にとって決して終わりのない流れとして現れる。

あなたはそれはあなたが例えば、ある時点でストリームを終了する必要があります完了したい場合:

composition.init(slides) 
    .take(3) 
    .subscribe(
    (slide) => { 
     console.log(slide); 
    }, 
    (err) => { 
     console.log("Error: " + err); 
    }, 
    () => { 
     //I never get here!!!!! 
     done(); 
    }); 

私は流れがあるので、Rxは、実際にここに適切なツールであることをスーパー確信していませんObservableが伝播を続けるのを実際に止めることはできないので、実際には「一時停止」するようには設計されていません。一時停止と一時停止の間に状態を保存するためにジャンプしているフープの数に気づいたでしょうから、発電機やIxJSのような別のライブラリを使うことを検討するのは理にかなっています。しかし、私は逃げる。

関連する問題