2016-11-11 15 views
0

MongoDBからデータを取得した後にファイルをストリーミング(新しい行区切りのJSON)して各行を変換しようとしています。行を変換した後、私はGoogleのbigqueryに格納したい - 一度に10000行。これらはすべてうまくいきますが、ストリームファイルの処理速度が時間の経過とともに大きく低下するという問題があります。Node.js時間の経過とともに読み込み可能なストリームが遅くなり、CPU使用率が低下します

ノードアプリケーションをあるサーバーに、mongodbを別のサーバーにセットアップしました。 30GBのRAMを搭載した8つのコアマシン。スクリプトが実行されると、最初にアプリケーションサーバーとmongodbサーバーのCPU使用率は約70〜75%になります。 30分後、CPU使用率は10%に低下し、最終的には1%に低下します。アプリケーションは例外を生成しません。私はアプリケーションログを見て、いくつかのファイルの処理が終了し、処理のために新しいファイルを使用したことを確認できます。 1つの実行は、午後3時より少し遅く、午後5時20分まで観測することができます。

CPU Usage Disk Usage

var cluster = require('cluster'), 
    os = require('os'), 
    numCPUs = os.cpus().length, 
    async = require('async'), 
    fs = require('fs'), 
    google = require('googleapis'), 
    bigqueryV2 = google.bigquery('v2'), 
    gcs = require('@google-cloud/storage')({ 
     projectId: 'someproject', 
     keyFilename: __dirname + '/auth.json' 
    }), 
    dataset = bigquery.dataset('somedataset'), 
    bucket = gcs.bucket('somebucket.appspot.com'), 
    JSONStream = require('JSONStream'), 
    Transform = require('stream').Transform, 
    MongoClient = require('mongodb').MongoClient, 
    mongoUrl = 'mongodb://localhost:27017/bigquery', 
    mDb, 
    groupA, 
    groupB; 

var rows = [], 
    rowsLen = 0; 

function transformer() { 

    var t = new Transform({ objectMode: true }); 

    t._transform = function(row, encoding, cb) { 
     // Get some information from mongodb and attach it to the row 
     if (row) { 
      groupA.findOne({ 
       'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
      }, { 
       fields: { 'properties.OA_SA': 1, _id: 0 } 
      }, function(err, a) { 
       if (err) return cb(); 
       groupB.findOne({ 
        'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
       }, { 
        fields: { 'properties.WZ11CD': 1, _id: 0 } 
       }, function(err, b) { 
        if (err) return cb(); 
        row.groupA = a ? a.properties.OA_SA : null; 
        row.groupB = b ? b.properties.WZ11CD : null; 

        // cache processed rows in memory 
        rows[rowsLen++] = { json: row }; 

        if (rowsLen >= 10000) { 
         // batch insert rows in bigquery table 
         // and free memory 
         log('inserting 10000') 
         insertRowsAsStream(rows.splice(0, 10000)); 
         rowsLen = rows.length; 
        } 

        cb(); 
       }); 
      }); 
     } else { 
      cb(); 
     } 
    }; 

    return t; 
} 

var log = function(str) { 
    console.log(str); 
} 

function insertRowsAsStream(rows, callback) { 
    bigqueryV2.tabledata.insertAll({ 
     "projectId": 'someproject', 
     "datasetId": 'somedataset', 
     "tableId": 'sometable', 
     "resource": { 
      "kind": "bigquery#tableDataInsertAllRequest", 
      "rows": rows 
     } 
    }, function(err, res) { 
     if (res && res.insertErrors && res.insertErrors.length) { 
      console.log(res.insertErrors[0].errors) 
      err = err || new Error(JSON.stringify(res.insertErrors)); 
     } 
    }); 
} 


function startStream(fileName, cb) { 
    // stream a file from Google cloud storage 
    var file = bucket.file(fileName), 
     called = false; 

    log(`Processing file ${fileName}`); 

    file.createReadStream() 
     .on('data', noop) 
     .on('end', function() { 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }) 
     .pipe(JSONStream.parse()) 
     .pipe(transformer()) 
     .on('finish', function() { 
      log('transformation ended'); 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }); 
} 

function processFiles(files, cpuIdentifier) { 
    if (files.length == 0) return; 
    var fn = []; 

    for (var i = 0; i < files.length; i++) { 
     fn.push(function(cb) { 
      startStream(files.pop(), cb); 
     }); 
    } 

    // process 3 files in parallel 
    async.parallelLimit(fn, 3, function() { 
     log(`child process ${cpuIdentifier} completed the task`); 
     fs.appendFile(__dirname + '/complete_count.txt', '1'); 
    }); 
} 

if (cluster.isMaster) { 
    for (var ii = 0; ii < numCPUs; ii++) { 
     cluster.fork(); 
    } 
} else { 
    MongoClient.connect(mongoUrl, function(err, db) { 
     if (err) throw (err); 
     mDb = db; 
     groupA = mDb.collection('groupageo'); 
     groupB = mDb.collection('groupbgeo'); 
     processFiles(files, process.pid); 
     // `files` is an array of file names 
     // each file is in newline json delimited format 
     // ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"] 
    }); 
} 
+0

RAMとハードドライブの使用状況はどのようでしたか? –

+0

私はメモリ割り当て/ GCエラーを取得していないため、RAMが正常だったと推測しています。なぜこのソリューションでHDが懸念されるのでしょうか? –

+0

メモリ割り当てエラーが発生していないということは、問題がないことを意味するわけではありません。 RAMを過剰に使用すると、スワップメモリ​​が使用され、ハードドライブが使用されます。 –

答えて

0

さて、私は犯人を見つけました! Google APIのNode.jsクライアントライブラリは、Streams 0.8を実装する「stream-events」というモジュールを使用しています。 Streams 0.8は、消費者のデータ消費能力に基づいて 'data'イベントを発生するレートを制御しません。レート制御機能はStreams 1.0で導入されました。つまり、これは基本的に、読み込み可能なストリームが処理できなかった速度でMongoDBにデータを投げていることを意味していました。

解決方法: 私は、Googleのクライアントライブラリの代わりに 'request'モジュールを使用しました。要求モジュールに署名されたURLを提供しました。要求モジュールは、結果をストリームに変換してトランスフォーマーに渡すことができました。

Take away: 使用しているストリームバージョンに使用するモジュールを常にチェックしてください。

関連する問題