2017-02-04 9 views
2

ストリームのネイティブ実装がどのように機能するかを理解しようとしています。 (すなわち[1,2,3,4]Node.JS - ネイティブストリームの理解に助けが必要

const Stream = require('stream'); 
 

 
// define a custom class to read my data into the stream 
 
class SourceWrapper extends Stream.Readable { 
 
    constructor(opt, content) { 
 
    super(opt); 
 
    this.content = content; 
 
    this.len = content.length; 
 
    this.index = 0; 
 
    } 
 

 
    _read() { 
 
    let i = this.index++; 
 
    if (i >= this.len) 
 
     this.push(null); 
 
    else { 
 
     this.push(this.content[i]); 
 
    } 
 
    } 
 
} 
 

 
// generate some data 
 
const arr = (new Array(10000000)).fill(1); 
 

 
// declare the streams 
 
const firstStream = new SourceWrapper({objectMode: true}, arr); 
 

 
const transform = (x, enc, next) => next(undefined, x * Math.random(x, 10)); 
 

 
const firstMapStream = new Stream.Transform({objectMode: true}); 
 
firstMapStream._transform = transform; 
 
const secondMapStream = new Stream.Transform({objectMode: true}); 
 
secondMapStream._transform = transform; 
 

 
// create a promise to measure execution time 
 
const start = new Date(); 
 

 
new Promise((resolve, reject) => { 
 
    firstStream 
 
    .pipe(firstMapStream) 
 
    .pipe(secondMapStream) 
 
    .on('finish',() => resolve(new Date())); 
 
}) 
 
.then((end) => console.log('execTime', end - start));

問題は、それが小さなデータセット上で動作することですが、大規模なセット上で実行された直後に終了するようだ:ここでは、コードです。

私には何が欠けていますか? objectModeとは関係がありますか?

ありがとうございました。

答えて

1

なぜなら、誰かがストリームdataイベントリスナーでストリームからデータを読み取る必要があるからです。問題を理解するためにコードを書き直しました。また、ゼロインデックスをスキップした間違ったインデックスカウントを修正しました。

'use strict'; 
const Stream = require('stream'); 

// define a custom class to read my data into the stream 
class SourceWrapper extends Stream.Readable { 
    constructor(opt, content) { 
    super(opt); 
    this.content = content; 
    this.len = content.length; 
    this.index = 0; 
    } 

    _read() { 
    let i = this.index; 
    if (i >= this.len) { 
     this.push(null); 
    } else { 
     this.push(this.content[i]); 
    } 
    this.index++; 
    } 
} 


const transform = (x, enc, next) => next(undefined, x * Math.random(x, 10)); 

const transform1 = new Stream.Transform({objectMode: true}); 
transform1._transform = transform; 

const transform2 = new Stream.Transform({objectMode: true}); 
transform2._transform = transform; 


const write = new Stream.Writable({ 
    objectMode: true, 
    write(value, enc, next) { 
     // Do something like writing... 
     next(); 
    } 
}); 


// generate some data 
const arr = (new Array(1000000)).fill(1); 
const read = new SourceWrapper({objectMode: true}, arr); 

new Promise((resolve, reject) => { 
    read 
    .pipe(transform1) 
    .pipe(transform2) 
    .pipe(write) 
    .on('finish',() => { 
     resolve(); 
    }); 
}) 
.then(() => { 
    console.log('Done'); 
}); 
+0

ありがとうございます。私は今理解していると思う。 – nainy

関連する問題