2016-02-03 14 views
5

RxJSを初めて使用していて、誰かが私を助けてくれるのだろうかと思っていました。RxJSを使用した要求ストリームからの同期ストリームのストリーム

リクエストのストリーム(ペイロードデータ)からレスポンスの同期ストリーム(できれば対応するリクエスト)を作成したいと考えています。

私は基本的にリクエストを1つずつ送信し、それぞれ最後のリクエストからの応答を待っています。

私はこれを試してみましたが、それは一度にすべてのもの(jsbin)送信:jsbin(限り、

var requestStream, responseStream; 
 
requestStream = Rx.Observable.from(['a','b','c','d','e']); 
 

 
responseStream = requestStream.flatMap(
 
    sendRequest, 
 
    (val, response)=>{ return {val, response}; } 
 
); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('result for '+val);},1000); 
 
    }); 
 
};

以下の作品を、しかし要求データのストリームを使用していません)。

var data, responseStream; 
 
data = ['a','b','c','d','e']; 
 
responseStream = Rx.Observable.create(observer=>{ 
 
    var sendNext = function(){ 
 
    var val = data.shift(); 
 
    if (!val) { 
 
     observer.onCompleted(); 
 
     return; 
 
    } 
 
    sendRequest(val).then(response=>{ 
 
     observer.onNext({val, response}); 
 
     sendNext(); 
 
    }); 
 
    }; 
 
    sendNext(); 
 
}); 
 

 
responseStream.subscribe(
 
    item=>{ 
 
    console.log(item); 
 
    }, 
 
    err => { 
 
    console.err(err); 
 
    }, 
 
()=>{ 
 
    console.log('Done'); 
 
    } 
 
); 
 

 
function sendRequest(val) { 
 
    return new Promise((resolve,reject)=>{ 
 
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500); 
 
    }); 
 
};

ありがとうございます!

EDIT:

だけ明確にする、これは私が達成したいものです。

あなたがAのための応答を受信したときに「送る、あなたはBのための応答を受信したときに、Bを送って、Aを送信しますuser3743222により示唆されるようにC、等...」

concatMapを使用し、延期、(jsbin)それを行うようだ:

responseStream = requestStream.concatMap(
    (val)=>{ 
    return Rx.Observable.defer(()=>{ 
     return sendRequest(val); 
    }); 
    }, 
    (val, response)=>{ return {val, response}; } 
); 

答えて

3

flatMapを最初のコードサンプルでconcatMapに置き換えて、結果の動作が探しているものと一致するかどうかをお知らせください。

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    sendRequest, 
    (val, response)=>{ return {val, response}; } 
); 

は基本的にconcatMapflatMapより同様の署名、それは次の1に進む​​前に完了するために、平坦化された最新の観察のために待機するという動作の違いがあります。従ってここに:

  • requestStreamの値はconcatMapオペレータにプッシュされます。
  • concatMapオペレータが観察sendRequestを生成し、その観察のうちどのような値(タプル(val, response)と思われる)セレクタ機能を通過すると、そのオブジェクト結果は下流
  • ときに渡されるであろうsendRequest完了し、別のrequestStream値が処理されます。要するに
  • は、あなたの要求は、多分あなたはsendRequestの実行を延期するdeferを使いたい、あるいは、1

ずつ処理されます。

responseStream = requestStream.concatMap(//I replaced `flatMap` 
    function(x){return Rx.Observable.defer(function(){return sendRequest(x);})}, 
    (val, response)=>{ return {val, response}; } 
); 
+0

ありがとうございました。私はあなたの解決策を試しましたが、要求はまだすべてすぐに送信されます。このドキュメントでは、flatMapはインターリーブを引き起こす可能性がありますが、concatMapはインターリーブを引き起こす可能性があることを示しています。それは注文に差があるようだ。 concatMapを使用するのは理にかなっていますが、Aの応答を受け取ったときにAを送信し、Bを送信し、Bの応答を受け取ったときにCを送信します。 – jamesref

+0

多分あなたが望むものを誤解しました。その場合、「延期」することができますか?私はコードを更新する – user3743222

+0

ありがとう!それは働いているようだ。 – jamesref

関連する問題