2016-12-26 4 views
0

私は、フォークされたプロセスからの無限のデータストリームを持っています。私はこのストリームをモジュールで処理したいときがあります。このストリームからデータを複製して、別のモジュールで処理することが必要な場合もあります(たとえば、データストリームを監視していますが、何か面白いことが起こったら、さらなる調査)。NodeJSストリーム分割

それでは、次のシナリオを想定してみましょう。

  1. 私はプログラムを起動し、読み込み可能ストリームを消費し始める
  2. 2秒後、私は別のストリームリーダー
  3. によって1秒間に同じデータを処理したいです
  4. 時間が過ぎると、私は第2の消費者を閉じたいが、元の消費者は元のままでなければならない。ここで

、このためのコードスニペットです:

var stream = process.stdout; 

stream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new PassThrough(); 
    stream.pipe(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    stream.unpipe(stream2); 
} 

ここに私の問題は、STREAM2をunpipingすると、それが閉じて取得していないということです。私はunpipeコマンドの後stream.end()を呼び出した場合、それはエラーでクラッシュ:

events.js:160 
     throw er; // Unhandled 'error' event 
    ^

Error: write after end 
    at writeAfterEnd (_stream_writable.js:192:12) 
    at PassThrough.Writable.write (_stream_writable.js:243:5) 
    at Socket.ondata (_stream_readable.js:555:20) 
    at emitOne (events.js:101:20) 
    at Socket.emit (events.js:188:7) 
    at readableAddChunk (_stream_readable.js:176:18) 
    at Socket.Readable.push (_stream_readable.js:134:10) 
    at Pipe.onread (net.js:548:20) 

私も第二の流れからフラッシュされるが、それはどちらか動作しませんでしたバッファを支援するために、ソース・ストリームを一時停止しようとしました。

function stopAnotherConsumer() { 
    stream.pause(); 
    stream2.once('unpipe', function() { 
     stream.resume(); 
     stream2.end(); 
    }); 
    stream.unpipe(stream2); 
} 

ここまでと同じエラー(後回し書き込み)。

どのように問題を解決するには?私の本来の目的は、ストリームデータを1つのポイントから複製し、しばらくしてから2番目のストリームを閉じることです。

Note: I tried to use this answer to make it work.

答えて

0

回答がなかったので、自分の(パッチワーク)ソリューションを投稿します。誰かがより良いものを持っている場合には、それを保持しないでください。

新しいストリーム:

const Writable = require('stream').Writable; 
const Transform = require('stream').Transform; 

class DuplicatorStream extends Transform { 
    constructor(options) { 
     super(options); 

     this.otherStream = null; 
    } 

    attachStream(stream) { 
     if (!stream instanceof Writable) { 
      throw new Error('DuplicatorStream argument is not a writeable stream!'); 
     } 

     if (this.otherStream) { 
      throw new Error('A stream is already attached!'); 
     } 

     this.otherStream = stream; 
     this.emit('attach', stream); 
    } 

    detachStream() { 
     if (!this.otherStream) { 
      throw new Error('No stream to detach!'); 
     } 

     let stream = this.otherStream; 
     this.otherStream = null; 
     this.emit('detach', stream); 
    } 

    _transform(chunk, encoding, callback) { 
     if (this.otherStream) { 
      this.otherStream.write(chunk); 
     } 

     callback(null, chunk); 
    } 
} 

module.exports = DuplicatorStream; 

と使用方法:

var stream = process.stdout; 
var stream2; 

duplicatorStream = new DuplicatorStream(); 
stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain 
duplicatorStream.pipe(detector); // Using the first consumer 

function startAnotherConsumer() { 
    stream2 = new stream.PassThrough(); 
    duplicatorStream.attachStream(stream2); 

    // use stream2 somewhere else 
} 

function stopAnotherConsumer() { 
    duplicatorStream.once('detach', function() { 
     stream2.end(); 
    }); 
    duplicatorStream.detachStream(); 
} 
関連する問題