2017-02-06 54 views
0

sourceStreamBaseData個のオブジェクトで構成されています。highland.jsを使用してストリームをフォークする方法は?

このストリームをnにフォークしたいと思っています。さまざまなストリームの量をフィルタリングし、それぞれの好みのものに変換します。

最後に、nストリームには特定のタイプのみが含まれています。フォークされたストリームの長さは、将来データが削除または追加される可能性があります。

{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 

bar

import * as _ from 'highland'; 

interface BaseData { 
    id: string; 
    data: string; 
} 

const sourceStream = _([ 
    {id: 'foo', data: 'poit'}, 
    {id: 'foo', data: 'fnord'}, 
    {id: 'bar', data: 'narf'}]); 

const partners = [ 
    'foo', 
    'bar', 
]; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream.fork(); 

    partnerStream.filter((baseData: BaseData) => { 
     return baseData.id === partner; 
    }); 

    partnerStream.each(console.log); 
}); 

私は今、2つのストリームを持つことが期待される、とfoo -streamは、2つの要素を含むように:

は私がforkを経由して、それまでにこれを設定すると考えていました1つの要素を含むストリーム:

{ id: 'bar', data: 'narf' } 

しかし、代わりにエラーが表示されます:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338 
     throw new Error(
     ^

Error: Stream already being consumed, you must either fork() or observe() 
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15) 
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10) 
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18) 
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19) 
    at Array.forEach (native) 
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10) 
    at Module._compile (module.js:570:32) 
    at Object.Module._extensions..js (module.js:579:10) 
    at Module.load (module.js:487:32) 
    at tryModuleLoad (module.js:446:12) 

ストリームを複数のストリームにフォークする方法はありますか。


私も呼び出しを連鎖しようとした、まだ、私は唯一のバック1つのストリームの結果を得る:

partners.forEach((partner: string) => { 
    console.log(partner); 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     }); 

    partnerStream.each((item: BaseData) => { 
     console.log(item); 
    }); 
}); 

印刷のみ:代わりに、期待の

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 
{id: 'bar', data: 'narf'} 

私は誤解されている場合もあります。forkがすべてでした。 its doc entryあたりのとおり:

Stream.fork()共有背圧で追加の消費者 を追加することができ、ストリームをフォーク。複数のコンシューマにフォークされたストリームは、 は、最も遅いコンシューマができるだけ高速にそのソースから値を取り出します。

注:フォーク間の一貫した実行順序に依存しないでください。 この変換では、すべてのフォークが値foo を処理してから2番目の値バーを処理することが保証されます。フォークプロセスfooが実行される順番を に保証するものではありません。

ヒント:フォーク内のストリーム値を変更する場合は注意してください(または )。 同じ値がすべてのフォークの に渡されるため、1つのフォークで行われた変更は、 の後に実行されるフォークで表示されます。これに矛盾した実行順序を追加すると、 が微妙なデータ破損のバグに終わる可能性があります。あなたが 任意の値を変更する必要がある場合は、コピーを作成し、代わりにコピーを変更する必要があります。

廃止の警告:(変換を経て、例えば)それを消費 後のストリームをフォークすることは現在可能です。これは、次のメジャーリリースではもはや にはなりません。ストリームをフォークする場合は、常に コールフォークを使用します。

ので、代わりの「ストリームをフォークにどのように?」私の実際の質問は次のようなものかもしれません。

答えて

0

一つは、すべてのフォークが作成される前に分岐したストリームを消費しないように留意しなければなりません。あたかもフォークされたストリームを消費するかのように、それとその親が消費され、後続のフォークが空のストリームからフォークされます。私はエラーが表示されないように、今、私の質問を更新しました

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ] 
1 [ { id: 'bar', data: 'narf' } ] 
1

partnerStream.filter()は、新しいストリームを返します。その後、fork()またはobserve()を呼び出さずに、partnerStream.each()を使用して再度partnerStreamを消費しています。だから、どちらかがpartnerStream.filter().each()呼び出しを鎖または変数にpartnerStream.filter()の戻り値を代入し、その上.each()を呼び出します。

+0

、まだ私が唯一のリターン1つのストリームの値ではなく、それらの両方を参照してください。

const partnerStreams: Array<Stream<BaseData>> = []; partners.forEach((partner: string) => { const partnerStream = sourceStream .fork() .filter((item: BaseData) => { return item.id === partner; } ); partnerStreams.push(partnerStream); }); partnerStreams.forEach((stream, index) => { console.log(index, stream); stream.toArray((foo) => { console.log(index, foo); }); }); 

それは印刷されます。 – k0pernikus

関連する問題