RxJSを初めて使用していて、誰かが私を助けてくれるのだろうかと思っていました。RxJSを使用した要求ストリームからの同期ストリームのストリーム
リクエストのストリーム(ペイロードデータ)からレスポンスの同期ストリーム(できれば対応するリクエスト)を作成したいと考えています。
私は基本的にリクエストを1つずつ送信し、それぞれ最後のリクエストからの応答を待っています。
私はこれを試してみましたが、それは一度にすべてのもの(jsbin)送信:jsbin(限り、
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
以下の作品を、しかし要求データのストリームを使用していません)。
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
ありがとうございます!
EDIT:
だけ明確にする、これは私が達成したいものです。
あなたがAのための応答を受信したときに「送る、あなたはBのための応答を受信したときに、Bを送って、Aを送信しますuser3743222により示唆されるようにC、等...」
concatMapを使用し、延期、(jsbin)それを行うようだ:
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
ありがとうございました。私はあなたの解決策を試しましたが、要求はまだすべてすぐに送信されます。このドキュメントでは、flatMapはインターリーブを引き起こす可能性がありますが、concatMapはインターリーブを引き起こす可能性があることを示しています。それは注文に差があるようだ。 concatMapを使用するのは理にかなっていますが、Aの応答を受け取ったときにAを送信し、Bを送信し、Bの応答を受け取ったときにCを送信します。 – jamesref
多分あなたが望むものを誤解しました。その場合、「延期」することができますか?私はコードを更新する – user3743222
ありがとう!それは働いているようだ。 – jamesref