sourceStream
はBaseData
個のオブジェクトで構成されています。highland.jsを使用してストリームをフォークする方法は?
このストリームをn
にフォークしたいと思っています。さまざまなストリームの量をフィルタリングし、それぞれの好みのものに変換します。
最後に、n
ストリームには特定のタイプのみが含まれています。フォークされたストリームの長さは、将来データが削除または追加される可能性があります。
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
とbar
:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
私は今、2つのストリームを持つことが期待される、とfoo
-streamは、2つの要素を含むように:
は私がfork
を経由して、それまでにこれを設定すると考えていました1つの要素を含むストリーム:
{ id: 'bar', data: 'narf' }
しかし、代わりにエラーが表示されます:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
ストリームを複数のストリームにフォークする方法はありますか。
私も呼び出しを連鎖しようとした、まだ、私は唯一のバック1つのストリームの結果を得る:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
印刷のみ:代わりに、期待の
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
を
私は誤解されている場合もあります。fork
がすべてでした。 its doc entryあたりのとおり:
Stream.fork()共有背圧で追加の消費者 を追加することができ、ストリームをフォーク。複数のコンシューマにフォークされたストリームは、 は、最も遅いコンシューマができるだけ高速にそのソースから値を取り出します。
注:フォーク間の一貫した実行順序に依存しないでください。 この変換では、すべてのフォークが値foo を処理してから2番目の値バーを処理することが保証されます。フォークプロセスfooが実行される順番を に保証するものではありません。
ヒント:フォーク内のストリーム値を変更する場合は注意してください(または )。 同じ値がすべてのフォークの に渡されるため、1つのフォークで行われた変更は、 の後に実行されるフォークで表示されます。これに矛盾した実行順序を追加すると、 が微妙なデータ破損のバグに終わる可能性があります。あなたが 任意の値を変更する必要がある場合は、コピーを作成し、代わりにコピーを変更する必要があります。
廃止の警告:(変換を経て、例えば)それを消費 後のストリームをフォークすることは現在可能です。これは、次のメジャーリリースではもはや にはなりません。ストリームをフォークする場合は、常に コールフォークを使用します。
ので、代わりの「ストリームをフォークにどのように?」私の実際の質問は次のようなものかもしれません。
、まだ私が唯一のリターン1つのストリームの値ではなく、それらの両方を参照してください。
それは印刷されます。 – k0pernikus