2017-02-13 20 views
0

RxJSの概念の詳細を理解したいと思います。私が現在苦労している問題はここにあります。私は非同期呼び出しでDBへのアクセスを抽象化したいと思います。私はアクセスを同期したいと思います。RxJSとの非同期アクションの同期

は、私はアクションのストリームを持つことができ、
- 非同期を行う
をDBへの格安通話 - のための観測を得るものとするアクションの呼び出し元 - 私は、以前のアクションが
を完了するまで、次の行動が遅れることがしたいです非同期アクションの結果

例:クラスの

ユーザーがアクション1を呼び出します:読むDB項目、次の状態(例えばインクリメントフィールド)を算出し、DB
に書き込み、その後..クラスの
ユーザーは、次のアクション(アクション2)を呼び出しますが、 Action1はまだ進行中です。
Action2:DB読み取り(Action1:書き込みが完了する前に起動してはならない)

RxJS + Typescriptでこれを行うにはどうすればよいですか? /////////////////////////////////

フランク

一方私はこれを持っていますコード:

import * as Rx from 'rxjs'; 

var actionQueue = new Rx.Subject<() => Rx.Observable<any>>(); 
actionQueue 
    .concatMap(v => v()) 
    .subscribe(v => {}); 

// example action with result type number 
function action1 (v : number) : Rx.Observable<number> { 
    console.log(':: action1: ', v); 
    var res = new Rx.Subject<number>(); 
    actionQueue.next(() => { 
    console.log('>> action1: ', v); 
    setTimeout(()=>{ 
     console.log('<< action1: ', v); 
     res.next(v); 
     res.complete(); 
    }, 500); 
    return res; 
    }); 
    return res; 
} 

// some actions enqueue now, after 700+2500ms 
action1(11).subscribe(v => console.log('XX action1: ', v)); 
action1(22).subscribe(v => console.log('XX action1: ', v)); 
action1(33).subscribe(v => console.log('XX action1: ', v)); 

setTimeout(()=>{ 
    action1(44).subscribe(v => console.log('XX action1: ', v)); 
}, 700); 

setTimeout(()=>{ 
    action1(55).subscribe(v => console.log('XX action1: ', v)); 
}, 2500); 

出力には、連続したものがあることが示されています。
typescript/js noobとして...このコードに落とし穴がありますか?よりエレガントな方法がありますか?

フランク

答えて

0

どうdelayWhen()演算子を使用してはどうですか?

// Observable wrapping action1. 
const obsAction1 = Observable.create(observer => { 
    // Read DB item 
    // Calculate next state 
    // Write to DB 
    // Then: 
    observer.complete(); 
}); 

// Private observable wrapping action2. 
// DO NOT subscribe to it directly. 
const _obsAction2 = Observable.create(observer => { 
    // Read DB 
    // Then: 
    observer.complete(); 
}); 

// Public observable wrapping action2 AND delayed by action 1. 
// This is what the client code should subscribe to. 
const obsAction2 = _obsAction2.delayWhen(obsAction1); 

観測を消費コード:

obsAction1.subscribe(val => console.log(val)); 

// Values will be received only when `obsAction1` emits or completes. 
obsAction2.subscribe(val => console.log(val)); 
関連する問題