2016-03-21 11 views
0

私はこの大理石図を実装しようとしていますが、hipotesisはN個のsN $を持っており、このストリームをメイン$に追加しています。Streamsのこのシーケンスを作成する方法はありますか?

s1$ +--1--------------------99---------------------> 
s2$ +------3--------7------------------------------> 

main$ +---[1]-[1, 3]---[1, 7]---[99, 7]--------------> 

は今、私はaproximationを持っていますが、 "繰り返し"

const main$ = new Rx.Subject() 
const s1$ = new Rx.Subject() 
const s2$ = new Rx.Subject() 

main$ 
    .scan((a, c) => [...a, c], []) 
    .subscribe(v => console.log(v)) 

s1$.subscribe(x => main$.onNext(x)) 
s2$.subscribe(x => main$.onNext(x))  

s1$.onNext(3) 
s2$.onNext(1) 

s1$.onNext(6) 
s2$.onNext(44) 

/* 
    Expect: 
    [3] 
    [3, 1] 
    [6, 1] 
    [6, 44] 
*/ 

/* 
    What I have: 
    [3] 
    [3, 1] 
    [3, 1, 6] 
    [3, 1, 6, 44] 
*/ 

でこれを行う方法はありますか? はまた、私はメインの$に流れSN $を追加しようとしました:

const main$ = new Rx.Subject() 
const s1$ = new Rx.Subject() 
const s2$ = new Rx.Subject() 

main$ 
    .mergeAll() 
    .scan((a, c) => [...a, c], []) 
    .subscribe(
    (v) => console.log(v) 
) 

main$.onNext(s1$) 
main$.onNext(s2$) 

s1$.onNext(3) 
s2$.onNext(1) 

s1$.onNext(6) 
s2$.onNext(44) 

答えて

1

あなたはcombineLatestを使用することができます。それでもすべてのストリームが値で始まる必要がありますが、nullの接頭辞を付けて、すべてのストリームをstartWithを使って開始することができます。

const source = Rx.Observable.combineLatest(
    s1.startWith(void 0), 
    s2.startWith(void 0), 
    s3.startWith(void 0), 
    (s1, s2, s3) => [s1, s2, s3]) 

オプションで、結果の配列からundefinedの値を削除できます。

ここでは、ストリームの可変リストを使用するように拡張することができます。 @xgrommxのクレジット

main$ 
.scan((a, c) => a.concat(c), []) 
.switch(obs => Rx.Observable.combineLatest(obs)) 

我々はまた、ときに我々switchストリームが存在最後の値を覚えて作るためにc.shareReplay(1)を使用することができます。しかし、c.startWith(void 0)と組み合わせることはできませんので、どちらか一方を使用することができます。

例:どのよう(

main$ 
    .scan((a, c) => [...a, c.startWith(null).shareReplay(1)], []) 
    .map(obs => Observable.combineLatest(obs)) 
    .switch() 
    .map((x) => x.filter((x) => x != null)) 
    .filter((x) => x.length) 

が非読めるルックス:

const main$ = new Rx.Subject() 
 
    const s1$ = new Rx.Subject(1) 
 
    const s2$ = new Rx.Subject(1) 
 
    const s3$ = new Rx.Subject(1) 
 
    const s4$ = new Rx.Subject(1) 
 

 
    main$ 
 
    .scan((a, c) => a.concat(c.shareReplay(1)), []) 
 
    .map(obs => Rx.Observable.combineLatest(obs)) 
 
    .switch() 
 
    .map(v => v.filter(e => !!e)) 
 
    .map(v => v.join(',')) 
 
    .subscribe(v => $('#result').append('<br>' + v)) 
 

 
    main$.onNext(s1$) 
 
    s1$.onNext(1) 
 
    main$.onNext(s2$) 
 
    s2$.onNext(void 0) // Since we can't use startWith 
 
    main$.onNext(s3$) 
 
    s3$.onNext(5) 
 
    s1$.onNext(55) 
 
    s2$.onNext(12) 
 
    s2$.onNext(14) 
 
    s3$.onNext(6) 
 
    main$.onNext(s4$) 
 
    s4$.onNext(999)
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script> 
 
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.6/rx.all.js"></script> 
 
    <div id="result"></div>

+0

私は考えを得る、私はそれを解決する、ありがとう! – davesnx

0

は、私はいくつかのフィルタに問題に最終的に私はstartWith()で始まるヌルを解決Rxのシーケンスですが、大理石を描くのは完全に意味があります!)

+0

なぜ "startWith"を使用しますか? – xgrommx

+0

はたぶん、あなたは 'JSの メイン$ .scan((C)=> [...、c.startWith(ヌル).shareReplay(1)]、[]) .MAPのようななめらかを使用することができますフィルタ(x => x.length)) .switch() ' – xgrommx

+0

内部の現在のサブジェクトを開始するには、次のように入力します(obs => Rx.Observable.combineLatest(obs).map(x => x.filter(Boolean)いくつかの値を持つ配列は、Lestestとsomething(この場合はnull)を結合できるようにします。理にかなっている? – davesnx

関連する問題