2013-07-18 9 views
9

nodejs sqsキュープロセッサを作成しようとしています。Nodejs sqsキュープロセッサ

"use strict"; 
var appConf = require('./config/appConf'); 
var AWS = require('aws-sdk'); 
AWS.config.loadFromPath('./config/aws_config.json'); 
var sqs = new AWS.SQS(); 
var exec = require('child_process').exec; 
function readMessage() { 
    sqs.receiveMessage({ 
    "QueueUrl": appConf.sqs_distribution_url, 
    "MaxNumberOfMessages": 1, 
    "VisibilityTimeout": 30, 
    "WaitTimeSeconds": 20 
    }, function (err, data) { 
    var sqs_message_body; 
    if (data.Messages) { 
     if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') { 
     //sqs msg body 
     sqs_message_body = JSON.parse(data.Messages[0].Body); 
     //make call to nodejs handler in codeigniter 
     exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', 
      function (error, stdout, stderr) { 
      if (error) { 
       throw error; 
      } 
      console.log('stdout: ' + stdout); 
      if(stdout == 'Success'){ 
       //delete message from queue 
       sqs.deleteMessage({ 
       "QueueUrl" : appConf.sqs_distribution_url, 
       "ReceiptHandle" :data.Messages[0].ReceiptHandle 
       }); 
      } 
      }); 
     } 
    } 
    }); 
} 
readMessage(); 

上記のコードは、キュー内の単一のメッセージに対してうまく機能します。すべてのメッセージが処理されるまでキュー内のメッセージのポーリングを続けるように、このスクリプトをどのように書くべきですか?私は設定タイムアウトを使用する必要がありますか?

答えて

15

あなたはdefinetelyアマゾンが提供するロングポーリング技術を使用する必要があり、私はあなたがsqs.receiveMessageコールで"WaitTimeSeconds": 20引数を持っているので、あなたはすでにそれを使用している理解してまず第一に。 AWS Web interfaceで設定することを忘れていないことを祈っています。

メッセージのポーリングについて - あなたはタイマーを含む様々な技術を使用することができるが、私は最も簡単なだけでreceiveMessage年代(あるいはexec年代)コールバック関数の最後にあなたのreadMessage()関数を呼び出すことになると思います。キュー内の次のメッセージの処理は、キュー内の前のメッセージの処理の終了直後に開始されます。

UPDATE:私として

コードの新しいバージョンでは多くのreadMessage()呼び出しにあります。私はコードをより明確かつ維持しやすくするために最小化する方が良いと思います。しかし、たとえば、あなたのメインのreceiveMessageコールバックの終わりに唯一のコールを残すと、パラレルで実行される多くのPHPワーカースクリプトを受け取ることになります。並行作業者の量を制御するための複雑なスクリプトを追加する必要があります。私はあなたがexecコールバックでいくつかのコールを切断することができると思う、ifに参加し、メインコールバックでコールに参加しようとします。メモリリークについて

"use strict"; 
var appConf = require('./config/appConf'); 
var AWS = require('aws-sdk'); 
AWS.config.loadFromPath('./config/aws_config.json'); 
var delay = 20 * 1000; 
var sqs = new AWS.SQS(); 
var exec = require('child_process').exec; 
function readMessage() { 
    sqs.receiveMessage({ 
    "QueueUrl": appConf.sqs_distribution_url, 
    "MaxNumberOfMessages": 1, 
    "VisibilityTimeout": 30, 
    "WaitTimeSeconds": 20 
    }, function (err, data) { 
    var sqs_message_body; 
    if (data.Messages) 
     && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) { 
     //sqs msg body 
     sqs_message_body = JSON.parse(data.Messages[0].Body); 
     //make call to nodejs handler in codeigniter 
     exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', 
      function (error, stdout, stderr) { 
      if (error) { 
       // error handling 
      } 
      if(stdout == 'Success'){ 
       //delete message from queue 
       sqs.deleteMessage({ 
       "QueueUrl" : appConf.sqs_distribution_url, 
       "ReceiptHandle" :data.Messages[0].ReceiptHandle 
       }, function(err, data){     
       }); 
      } 
      readMessage();     
      }); 
     }   
    }   
    readMessage();   
    }); 
} 
readMessage(); 

:私はreadMessage()の次の呼び出しは、コールバック関数の中で起こるので、あなたが心配してはならないと思います - そうでない再帰的、および再帰関数と呼ばちょうどreceiveMessage()関数を呼び出した後、親関数に値を返します。

+0

こんにちは、この本をよく読んでください。 https://gist.github.com/yalamber/374add88e887e688d818 – Yalamber

+0

また、このスクリプトの実行中にメモリリークが心配する必要がありますか? – Yalamber

+0

@askkirati更新済み! – zavg

1

ノードを使用している場合は、https://www.npmjs.com/package/sqs-workerモジュールを使用してください。それはあなたのために仕事をします。

var SQSWorker = require('sqs-worker') 

var options = 
{ url: 'https://sqs.eu-west-1.amazonaws.com/001123456789/my-queue' 
} 

var queue = new SQSWorker(options, worker) 

function worker(notifi, done) { 
    var message; 
    try { 
    message = JSON.parse(notifi.Data) 
    } catch (err) { 
    throw err 
    } 

    // Do something with `message` 

    var success = true 

    // Call `done` when you are done processing a message. 
    // If everything went successfully and you don't want to see it any more, 
    // set the second parameter to `true`. 
    done(null, success) 
} 
+1

なぜダウン投票? https://github.com/BBC/sqs-consumerも同様です – Cmag