RxJs

2017-10-15 2 views
1

で2半分に参加するにはどうすればこのRxJs

---ab---ab---a---ba---bab---ab---ab---ab---> 

のような流れを持っていると私はこれをしたいです。

---ab---ab------ab----ab-ab-ab---ab---ab---> 

ポイントは、私は最初と最後(JSON)、時にはデータがストリームで半分にカットされたとのデータがあり、私はそれらを再度参加したいということ、です。どうやってやるの?

+0

をどのようにあなたがが 'ab'と' A'を区別するのですか?私は[bufferCount](http://rxmarbles.com/#bufferCount)について考えていたが、それは正しいことではない、と確信している。 –

答えて

0

これは私が解決方法です:

import Rx from 'rxjs/Rx'; 
import {last} from 'lodash'; 

const data$ = Rx.Observable.of('ab','ab','a','ba','bab','aba','b','ab'); 
const line$ = data$.flatMap(data => { 
    const lines = data.match(/[^b]+b?|b/g); // https://stackoverflow.com/a/36465144/598280 https://stackoverflow.com/a/25221523/598280 
    return Rx.Observable.from(lines); 
}); 

const isComplete$ = line$.scan((acc, value) => { 
    const isLineEndingLast = last(acc.value) === 'b'; 
    const id = isLineEndingLast ? acc.id + 1 : acc.id; 
    const complete = last(value) === 'b'; 
    return {value, id, complete}; 
}, {value: 'b', id: 0, complete: true}); 

const grouped$ = isComplete$ 
    .groupBy(data => data.id, data => data, group => group.first(data => data.complete)) 
    .flatMap(group => group.reduce((acc, data) => acc + data.value, '')); 

grouped$.subscribe(console.log); 
1

スキャンオペレータ

// substitute appropriate real-world logic 
const isProperlyFormed = (x) => x === 'ab' 
const isIncomplete = (x) => x[0] === 'a' && x.length === 1 
const startsWithEnding = (x) => x[0] === 'b' 
const getCorrected = (buffer, x) => buffer.prev + x[0] 
const getTail = (buffer, x) => x.slice(1) 

const initialBuffer = { 
    emit: [], 
    prev: null 
} 

const result = source 
    .scan((buffer, x) => { 
    if (isProperlyFormed(x)) { 
     buffer = {emit: [x], prev:null} 
    } 
    if (isIncomplete(x)) { 
     buffer = {emit: [], prev:x} 
    } 
    if (startsWithEnding(x)) { 
     const corrected = getCorrected(buffer, x) 
     const tail = getTail(buffer, x) 
     if (isProperlyFormed(tail)) { 
     buffer = {emit: [corrected, tail], prev: null} 
     } else { 
     buffer = {emit: [corrected], prev: tail} 
     } 
    } 
    return buffer 
    }, initialBuffer) 
    .flatMap(x => x.emit) 

ワーキングCodePen

編集

テスト入力ストリームを見て、私はケースが欠落していると思いますが、意志のための仕事のように見えます上記を破る。

私はアルゴリズム

const getNextBuffer = (x) => { 
    const items = x.split(/(ab)/g).filter(y => y) // get valid items plus tail 
    return { 
    emit: items.filter(x => x === 'ab'), // emit valid items 
    save: items.filter(x => x !== 'ab')[0] // save tail 
    } 
} 

const initialBuffer = { 
    emit: [], 
    save: null 
} 

const result = source 
    .scan((buffer, item) => { 
    const bufferAndItem = (buffer.save ? buffer.save : '') + item 
    return getNextBuffer(bufferAndItem) 
    }, initialBuffer) 
    .flatMap(x => x.emit) 

の作業例CodePen

1

Fiを提供してスリム化

---ab---ab---a---ba---bab---aba---b---ab---> 

とも

---ab---ab---a---ba---bab---ab---ab---ab---> 

からテストを変更しました最初はストリームを完全な応答と部分的に分割します。次に、レスポンスがいっぱいであることを確認します完全な応答はそのように良いです。部分的な応答は同期する必要があるため、ストリームを最初と最後の2つに分割し、それらのストリームをまとめて圧縮します。

Rx.Observable.of(g.partition(x => x[0] === 'a'))は、partitionの演算子が連鎖できない観測値のペアを返すためです。

const testStream = Rx.Observable.of('a1', 'a2', '_ab', 'b1', 'a3', 'b2', '_ab', 'a4', 'b3', '_ab', 'b4', 'a5', 'b5', '_ab') 
 

 
testStream 
 
    .groupBy(x => (x[0] === '_' && 'full') || 'partial') 
 
    .mergeMap(g => 
 
    Rx.Observable.if(
 
    () => g.key == 'full', 
 
     g, 
 
     Rx.Observable.of(g.partition(x => x[0] === 'a')) 
 
     .mergeMap(([as, bs]) => Rx.Observable.zip(as, bs)) 
 
    ) 
 
) 
 
    .do(x => console.log(x)) 
 
    .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>