2015-10-14 31 views
7

bluebird.map()呼び出しで使用するために大きなCSVファイルを処理する関数を構築する必要があります。ファイルの潜在的なサイズを考えると、私はストリーミングを使用したいと思います。NodeJS、大規模なCSVファイルの処理を約束する

この関数は、ストリーム(CSVファイル)と関数(ストリームからチャンクを処理する)を受け入れ、ファイルが読み込み(解決)またはエラー(拒否)されたときに約束を返す必要があります。

だから、私は開始:作成しないように、私は、処理されているデータの実際の量を絞る必要がある

  1. :今

    'use strict'; 
    
    var _ = require('lodash'); 
    var promise = require('bluebird'); 
    var csv = require('csv'); 
    var stream = require('stream'); 
    
    var pgp = require('pg-promise')({promiseLib: promise}); 
    
    api.parsers.processCsvStream = function(passedStream, processor) { 
    
        var parser = csv.parse(passedStream, {trim: true}); 
        passedStream.pipe(parser); 
    
        // use readable or data event? 
        parser.on('readable', function() { 
        // call processor, which may be async 
        // how do I throttle the amount of promises generated 
        }); 
    
        var db = pgp(api.config.mailroom.fileMakerDbConfig); 
    
        return new Promise(function(resolve, reject) { 
        parser.on('end', resolve); 
        parser.on('error', reject); 
        }); 
    
    } 
    

    、私は2つの相互に関連する問題を抱えています記憶圧力。

  2. processor paramとして渡される関数は、約束ベースのライブラリ(現時点ではpg-promise)を介してファイルの内容をdbに保存するなど、非同期になることがよくあります。そのため、それは記憶の中に約束を作り、繰り返し移動するでしょう。

pg-promiseライブラリはpage()のように、これを管理するための機能がありますが、私はこれらの約束メソッドでストリームのイベントハンドラをミックスする方法のまわりで私の先をラップすることはできませんよ。現在、read()の後にreadableセクションのハンドラで約束を返しています。これは、約束されたデータベース操作を作成し、プロセスメモリの上限に達したために最終的にフォールトアウトすることを意味します。

私はジャンプポイントとして使用できる実例がありますか?

UPDATE:猫の皮を剥ぐためにおそらく複数の方法が、これは動作します:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    // some checks trimmed out for example 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 
    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    var readDataFromStream = function(index, data, delay) { 
    var records = []; 
    var record; 
    do { 
     record = parser.read(); 
     if(record != null) 
     records.push(record); 
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency)) 
    parser.pause(); 

    if(records.length) 
     return records; 
    }; 

    var processData = function(index, data, delay) { 
    console.log('processData(' + index + ') > data: ', data); 
    parser.resume(); 
    }; 

    parser.on('readable', function() { 
    db.task(function(tsk) { 
     this.page(readDataFromStream, processData); 
    }); 
    }); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 
} 

誰でもこのアプローチの潜在的な問題を見ていますか?

+0

見た目がきれいです。 ) 'pg-promise'への最近の' page'の追加が無駄ではないことを嬉しく思っています; –

+0

readDataFromStream;の最後でそれを単純化しました;) 'undefined'を返す必要はありません。とにかく何も返さないとき); –

+0

実際には、これに問題があるかもしれません... db.taskを呼び出すと、その結果を処理しないので、それが拒否された場合、エラーがスローされますあなたの拒否が処理されていないことを約束してください。 –

答えて

3

以下は、同じ種類のタスクを正しく実行する完全なアプリケーションです。ファイルをストリームとして読み込み、CSVとして解析し、各行をデータベースに挿入します。

const fs = require('fs'); 
const promise = require('bluebird'); 
const csv = require('csv-parse'); 
const pgp = require('pg-promise')({promiseLib: promise}); 

const cn = "postgres://postgres:[email protected]:5432/test_db"; 
const rs = fs.createReadStream('primes.csv'); 

const db = pgp(cn); 

function receiver(_, data) { 
    function source(index) { 
     if (index < data.length) { 
      // here we insert just the first column value that contains a prime number; 
      return this.none('insert into primes values($1)', data[index][0]); 
     } 
    } 

    return this.sequence(source); 
} 

db.task(t => { 
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver); 
}) 
    .then(data => { 
     console.log('DATA:', data); 
    } 
    .catch(error => { 
     console.log('ERROR:', error); 
    }); 

なお、私は変更さ唯一の事:より良い代替手段として、ライブラリcsv-parse代わりのcsvを使用。

spexライブラリーのメソッドstream.readの使用が追加されました。これは、約束どおりにReadableストリームを提供します。

+0

これは' query( "INSERT ...")の後に 'parser'から次の項目を読み込もうとしません次のアイテムがすでに読み込み可能かどうかあるいは 'parser.read()'は約束を返しますか? – Bergi

+0

また、OPが探していた約束返す 'processor'コールバック関数はどうでしたか? – Bergi

+0

@Bergi私の理解は、parser.read()は同期していることを示していました。もしそうでないと判明すれば、それは明らかに約束に包まれる必要があります。そして、「読み取り可能」は、各読み取り操作ではなく、一度起動されます。これは私の理解です。約束を返すプロセッサに関しては、データ処理が完了したときに解決策を探していただけであり、失敗した場合には拒否を求めていました。私の例ではタスクが解決/拒否されます。 –

1

あなたはストリーミングはしたくないが、何らかのデータチャンクを望んでいないと言えますか? ;-)

あなたはhttps://github.com/substack/stream-handbookを知っていますか?

あなたのアーキテクチャを変更することなく最も簡単なアプローチは、ある種の約束のプールだろうと思います。例えばhttps://github.com/timdp/es6-promise-pool

+0

さて、関数で 'async.queue'を使用して、最終的にファイルを完成させる(またはしない)という約束を返すことを考えました。しかし、私は大規模なファイルの典型的なストリームベースの処理とBluebirdのような約束ライブラリをどう結びつけているのか疑問に思っていました。 ('pg-promise'には、より高いレベルの約束関数を提供する 'spex'が含まれています) – alphadogg

6

あなたはpromise-streams

var ps = require('promise-streams'); 
passedStream 
    .pipe(csv.parse({trim: true})) 
    .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row))) 
    .wait().then(_ => { 
    console.log("All done!"); 
    }); 

背圧および​​すべてと作品を見たいかもしれません。

関連する問題