bluebird.map()呼び出しで使用するために大きなCSVファイルを処理する関数を構築する必要があります。ファイルの潜在的なサイズを考えると、私はストリーミングを使用したいと思います。NodeJS、大規模なCSVファイルの処理を約束する
この関数は、ストリーム(CSVファイル)と関数(ストリームからチャンクを処理する)を受け入れ、ファイルが読み込み(解決)またはエラー(拒否)されたときに約束を返す必要があります。
だから、私は開始:作成しないように、私は、処理されているデータの実際の量を絞る必要がある
- :今
'use strict'; var _ = require('lodash'); var promise = require('bluebird'); var csv = require('csv'); var stream = require('stream'); var pgp = require('pg-promise')({promiseLib: promise}); api.parsers.processCsvStream = function(passedStream, processor) { var parser = csv.parse(passedStream, {trim: true}); passedStream.pipe(parser); // use readable or data event? parser.on('readable', function() { // call processor, which may be async // how do I throttle the amount of promises generated }); var db = pgp(api.config.mailroom.fileMakerDbConfig); return new Promise(function(resolve, reject) { parser.on('end', resolve); parser.on('error', reject); }); }
、私は2つの相互に関連する問題を抱えています記憶圧力。
processor
paramとして渡される関数は、約束ベースのライブラリ(現時点ではpg-promise
)を介してファイルの内容をdbに保存するなど、非同期になることがよくあります。そのため、それは記憶の中に約束を作り、繰り返し移動するでしょう。
pg-promise
ライブラリはpage()のように、これを管理するための機能がありますが、私はこれらの約束メソッドでストリームのイベントハンドラをミックスする方法のまわりで私の先をラップすることはできませんよ。現在、read()
の後にreadable
セクションのハンドラで約束を返しています。これは、約束されたデータベース操作を作成し、プロセスメモリの上限に達したために最終的にフォールトアウトすることを意味します。
私はジャンプポイントとして使用できる実例がありますか?
UPDATE:猫の皮を剥ぐためにおそらく複数の方法が、これは動作します:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
誰でもこのアプローチの潜在的な問題を見ていますか?
見た目がきれいです。 ) 'pg-promise'への最近の' page'の追加が無駄ではないことを嬉しく思っています; –
readDataFromStream;の最後でそれを単純化しました;) 'undefined'を返す必要はありません。とにかく何も返さないとき); –
実際には、これに問題があるかもしれません... db.taskを呼び出すと、その結果を処理しないので、それが拒否された場合、エラーがスローされますあなたの拒否が処理されていないことを約束してください。 –