2017-01-06 9 views
1

これでRxJSをもっとよく学ぶために、カスタムRx演算子を作成することにしました。だからここPromise.resolve()でもObservables(RxJS5)

が正常に動作しますシンプルなものです:

Rx.Observable.prototype.multiply = function (input) { 

    const source = this; 

    return Rx.Observable.create(function (obs) { 

     return source.subscribe(function(val){ 
      obs.next(input*val); 
     }); 
    }); 

}; 

と、私たちはそうのようにそれを使用することができます:

const obs = Rx.Observable.interval(1000) 
    .multiply(4) 
    .forEach(function (v) { 
     console.log(v); 
    }); 

は、しかし、我々はもう少し複雑なものを何を得る場合、例えば演算子が静的な値の代わりに関数を受け取る場合

const obs = Rx.Observable.interval(1000) 
    .handleFn(function(){ 
     return Rx.Observable.timer(399); 
    }) 
    .forEach(function (v) { 
     console.log(v); 
    }); 

一部がある:

Rx.Observable.prototype.handleFn = function (fn) { 

    const source = this; 

    return Rx.Observable.create(function (obs) { 

     return source.subscribe(function(val){ 
      obs.next(fn.call(obs,val)); 
     }); 
    }); 

}; 

上記は、すべての良い、よく、しかし、我々は、入力関数から返されるRx.Observableを処理する必要がある場合は、このようなものは何です私はRx.Observable.timer()の結果を解決できるように、Promise.resolve()のソートをObservablesに対して行っていますか? Rx.Observable.prototype.flatMapなどのソースコードをチェックします!

答えて

3

あなたはこのように、.mergeAll()を使用することができます。

Rx.Observable.prototype.handleFn = function (fn) { 

    const source = this; 

    return Rx.Observable.create(function (obs) { 

     return source.subscribe(function(val){ 
      obs.next(fn.call(obs,val)); 
     }); 
    }); 

}; 

const obs = Rx.Observable.interval(1000) 
    .handleFn(function(){ 
     return Rx.Observable.timer(150).mapTo(Math.random()); 
    }) 
    .mergeAll(); 

obs.subscribe(x => console.log(x)); 

ライブJSBin hereを参照してください。


オプション: 代わりのmergeAllが、あなたはまた、次の操作を行うことができます:

Rx.Observable.prototype.handleFn = function (fn) { 
    const source = this; 

    return Rx.Observable.create(function (obs) { 
     return source.subscribe(function(val){ 
      fn.call(obs,val).subscribe(x => obs.next(x)); 
     }); 
    }); 
}; 

追加注:あなたは、これはどのように確認したい場合は適切実装されている、あなたはすでにflatMap, switchMap, concatMapの出典で(あなた自身が既に述べたように)見てください。

+0

ソースは、それがそのTS – Olegzandr

+0

に精通したが、私ではない活字体、だからうん、それは難しいですこれが始まる場所だと推測しますhttps://github.com/ReactiveX/rxjs/blob/c7cfe444642407f3227a09a0cdf38fd495b867e1/src/add/operator/mergeMap.ts – Olegzandr

+0

mergeMapとflatMapは同じであるように見えます - https://github.com /ReactiveX/rxjs/blob/master/src/operator/mergeMap.ts – Olegzandr

0

ソースを見ると、それはTypeScriptで書かれています。しかし私はTSを知らないので、それをES5に移しました。 flatMapとmergeMapは同等の演算子でなければならないことが判明し、mergeMapはソースで唯一であり、ここではそれがtranspiledさ:

"use strict"; 
var __extends = (this && this.__extends) || function (d, b) { 
    for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; 
    function __() { this.constructor = d; } 
    d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); 
}; 
var subscribeToResult_1 = require("../util/subscribeToResult"); 
var OuterSubscriber_1 = require("../OuterSubscriber"); 
/* tslint:disable:max-line-length */ 
/** 
* Projects each source value to an Observable which is merged in the output 
* Observable. 
* 
* <span class="informal">Maps each value to an Observable, then flattens all of 
* these inner Observables using {@link mergeAll}.</span> 
* 
* <img src="./img/mergeMap.png" width="100%"> 
* 
* Returns an Observable that emits items based on applying a function that you 
* supply to each item emitted by the source Observable, where that function 
* returns an Observable, and then merging those resulting Observables and 
* emitting the results of this merger. 
* 
* @example <caption>Map and flatten each letter to an Observable ticking every 1 second</caption> 
* var letters = Rx.Observable.of('a', 'b', 'c'); 
* var result = letters.mergeMap(x => 
* Rx.Observable.interval(1000).map(i => x+i) 
*); 
* result.subscribe(x => console.log(x)); 
* 
* // Results in the following: 
* // a0 
* // b0 
* // c0 
* // a1 
* // b1 
* // c1 
* // continues to list a,b,c with respective ascending integers 
* 
* @see {@link concatMap} 
* @see {@link exhaustMap} 
* @see {@link merge} 
* @see {@link mergeAll} 
* @see {@link mergeMapTo} 
* @see {@link mergeScan} 
* @see {@link switchMap} 
* 
* @param {function(value: T, ?index: number): Observable} project A function 
* that, when applied to an item emitted by the source Observable, returns an 
* Observable. 
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] 
* A function to produce the value on the output Observable based on the values 
* and the indices of the source (outer) emission and the inner Observable 
* emission. The arguments passed to this function are: 
* - `outerValue`: the value that came from the source 
* - `innerValue`: the value that came from the projected Observable 
* - `outerIndex`: the "index" of the value that came from the source 
* - `innerIndex`: the "index" of the value from the projected Observable 
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input 
* Observables being subscribed to concurrently. 
* @return {Observable} An Observable that emits the result of applying the 
* projection function (and the optional `resultSelector`) to each item emitted 
* by the source Observable and merging the results of the Observables obtained 
* from this transformation. 
* @method mergeMap 
* @owner Observable 
*/ 


function mergeMap(project, resultSelector, concurrent) { 
    if (concurrent === void 0) { concurrent = Number.POSITIVE_INFINITY; } 
    if (typeof resultSelector === 'number') { 
     concurrent = resultSelector; 
     resultSelector = null; 
    } 
    return this.lift(new MergeMapOperator(project, resultSelector, concurrent)); 
} 


exports.mergeMap = mergeMap; 


var MergeMapOperator = (function() { 
    function MergeMapOperator(project, resultSelector, concurrent) { 
     if (concurrent === void 0) { concurrent = Number.POSITIVE_INFINITY; } 
     this.project = project; 
     this.resultSelector = resultSelector; 
     this.concurrent = concurrent; 
    } 
    MergeMapOperator.prototype.call = function (observer, source) { 
     return source.subscribe(new MergeMapSubscriber(observer, this.project, this.resultSelector, this.concurrent)); 
    }; 
    return MergeMapOperator; 
}()); 

exports.MergeMapOperator = MergeMapOperator; 
/** 
* We need this JSDoc comment for affecting ESDoc. 
* @ignore 
* @extends {Ignored} 
*/ 
var MergeMapSubscriber = (function (_super) { 

    __extends(MergeMapSubscriber, _super); 

    function MergeMapSubscriber(destination, project, resultSelector, concurrent) { 
     if (concurrent === void 0) { concurrent = Number.POSITIVE_INFINITY; } 
     var _this = _super.call(this, destination) || this; 
     _this.project = project; 
     _this.resultSelector = resultSelector; 
     _this.concurrent = concurrent; 
     _this.hasCompleted = false; 
     _this.buffer = []; 
     _this.active = 0; 
     _this.index = 0; 
     return _this; 
    } 
    MergeMapSubscriber.prototype._next = function (value) { 
     if (this.active < this.concurrent) { 
      this._tryNext(value); 
     } 
     else { 
      this.buffer.push(value); 
     } 
    }; 
    MergeMapSubscriber.prototype._tryNext = function (value) { 
     var result; 
     var index = this.index++; 
     try { 
      result = this.project(value, index); 
     } 
     catch (err) { 
      this.destination.error(err); 
      return; 
     } 
     this.active++; 
     this._innerSub(result, value, index); 
    }; 


    MergeMapSubscriber.prototype._innerSub = function (ish, value, index) { 
     this.add(subscribeToResult_1.subscribeToResult(this, ish, value, index)); 
    }; 


    MergeMapSubscriber.prototype._complete = function() { 
     this.hasCompleted = true; 
     if (this.active === 0 && this.buffer.length === 0) { 
      this.destination.complete(); 
     } 
    }; 


    MergeMapSubscriber.prototype.notifyNext = function (outerValue, innerValue, outerIndex, innerIndex, innerSub) { 
     if (this.resultSelector) { 
      this._notifyResultSelector(outerValue, innerValue, outerIndex, innerIndex); 
     } 
     else { 
      this.destination.next(innerValue); 
     } 
    }; 


    MergeMapSubscriber.prototype._notifyResultSelector = function (outerValue, innerValue, outerIndex, innerIndex) { 
     var result; 
     try { 
      result = this.resultSelector(outerValue, innerValue, outerIndex, innerIndex); 
     } 
     catch (err) { 
      this.destination.error(err); 
      return; 
     } 
     this.destination.next(result); 
    }; 


    MergeMapSubscriber.prototype.notifyComplete = function (innerSub) { 
     var buffer = this.buffer; 
     this.remove(innerSub); 
     this.active--; 
     if (buffer.length > 0) { 
      this._next(buffer.shift()); 
     } 
     else if (this.active === 0 && this.hasCompleted) { 
      this.destination.complete(); 
     } 
    }; 


    return MergeMapSubscriber; 


}(OuterSubscriber_1.OuterSubscriber)); 
exports.MergeMapSubscriber = MergeMapSubscriber; 
//# sourceMappingURL=mergeMap.js.map