2016-09-07 3 views
1

設定した間隔の後に繰り返したい郵便番号のストリームがあります。私は郵便番号を送信し、APIからデータを戻すことによってページに表示されている温度を更新しようとしています。私は一度に1つずつ送る必要があります。だから私は特定の間隔の後にすべての明確な郵便番号を取得し、ストリーム全体を反復できるようにする必要があります。次に、ページの温度を更新したいと思います。どのようにRxJSで一定の間隔のあとにストリーム全体に行きますか

// Get stream of zip codes 
const zipcodeStream = 
    Rx.Observable 
    .fromEvent(zipcodeInput, 'input') 
    .map(e => e.target.value) 
    .filter(zip => zip.length === 5); 

// Create a timer to refresh the data 
Rx.Observable 
    .interval(5000) 
    .zip(zipcodeStream, ([i, zip]) => zip) 
    .forEach((...args) => { 
    console.log('interval forEach args', ...args); 
    }); 

これは、新しい郵便番号が入力され、間隔が経過したときに、1つの郵便番号のみを送信します。私はそれらにすべてアクセスして、繰り返し処理したい。

+0

あなたのケースでは** **間違いはありません。ドキュメントごとに「各期間の後に値**を生成する観測可能なシーケンスを返します**」(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/ interval.md)。 –

+0

@SufianSaory私がしようとしていることを達成する最良の方法は何でしょうか? – searsaw

+0

すばやく考えて、別の間隔、たとえば500 msを使用して結果を累積し、5000のメイン間隔に結果シーケンスを入力してください。 –

答えて

1

zipcodeStreamによって放出されたアイテムをすべて保存して、5秒ごとに反復できるようにするためには、ReplaySubjectを使用する必要があります。これらは放出されたすべてのアイテムを保存し、オブザーバーが購読するたびに再生します。

対照的に、あなたの現在のzipcodeStreamは、「熱心な」観測可能です。これは、アイテムが作成されるとすぐにアイテムの放出を開始することを意味し、その後のサブスクライバは、サブスクライブする前に放出されたアイテムを「見逃す」ことになります。

const zipcodeStream = 
    Rx.Observable 
    .fromEvent(zipcodeInput, 'input') 
    .map(e => e.target.value) 
    .filter(zip => zip.length === 5); 

const zipcodeSubject = new Rx.ReplaySubject(); 
const zipcodeDisposable = zipcodeStream.subscribe(zipcodeSubject); 

Rx.Observable 
    .interval(5000) 
    // Every 5000ms, will emit all items from zipcodeSubject 
    .flatMapLatest(() => zipcodeSubject) 
    .forEach((...args) => { 
    console.log('interval forEach args', ...args); 
    }); 
+0

この問題は、同じReplaySubjectへの複数のサブスクリプションを作成することにのみ問題があります。したがって、新しいzipコードが 'zipcodeStream'に追加されるたびに、変更が各サブスクライバに伝播するので、出力に複数回表示されます。 – searsaw

+0

良いキャッチ。これを避けるために 'flatMap'を' flatMapLatest'に変更しました。このオペレータは、以前フラットにマッピングされたObservable(すなわち、「zipcodeStream」)からの登録を解除し、新しいObservableに加入し(すなわち、zipcodeStreamに再登録する)、この最新のObservableからのアイテムのみを放出する。 – int3h

関連する問題