2017-11-15 4 views
0

挨拶する人。
node.jsで非同期で手助けできますか?メソッドを使用してすべてのメッセージを取得する方法は、lib amqp.nodeで消費されますか?

私がここにありRabbitMQとの仕事のために使用amqplib moduleだとRabbitMQのからのメッセージを与えるを消費方法、彼はを開始し、この後、約その方法最初のリターンの約束:この

問題約束は始まる、RabbitMQからデータを得るためにコールバックをコールし、すべてのメッセージが私のノードjsアプリに送信される時を捕まえる方法を知らない。より多くのコメントで私のコードと終了コードをここに、を説明するため

私は私が望むものを書いた:

/** 
* Here my test code 
* 
* requirng amqp.node lib 
*/ 
let amqp = require('amqplib') 
    , configConnection = { /* ..config options */ } 
    , queue = 'users' 
    , exchange = 'users.exchange' 
    , type = 'fanout' 

/** 
* declare annonymous function as async and immediately called it 
*/ 
(async() => { 
    /** 
    * declare connection and channel with using async/await construction 
    * who support version node.js >= 8.5.0 
    */ 
    let conn = await amqp.connect(configConnection) 
    let channel = await conn.createChannel() 
    await channel.assertExchange(exchange, type) 
    let response = await channel.assertQueue(queue) 
    /** 
    * response: { queue: 'users', messageCount: 10, consumerCount: 0 } 
    */ 
    response = await channel.bindQueue(response.queue, exchange, '') 
    response = await channel.consume(response.queue, logMessage, {noAck: false}) 
    /** 
    * {noAck: false} false for not expect an acknowledgement 
    */ 
    console.log('reading for query finish') 

    function logMessage(msg) { 
    console.log("[*] recieved: '%s'", msg.content.toString()) 
    } 
})() 
    /** 
    * output will show: 
    * reading for query finish 
    * [*] recieved: 'message content' 
    * [*] recieved: 'message content' 
    * [*] recieved: 'message content' 
    * ... 
    * 
    * But i'm need show message 'reading for query finish' after when 
    * all consumes will executed 
    * 
    * Ask: How i can do this? 
    */ 

答えて

1

私は私の質問hereに答えを見つけました。使用中の

回答:持つEventEmitter & &の約束(私にとっては)

魔法はここにある:
await new Promise(resolve => eventEmitter.once('consumeDone', resolve))

ので、終了コードは次のとおりです。

/** 
* Here my test code 
* 
* requirng amqp.node lib 
*/ 
let amqp = require('amqplib') 
    , EventEmitter = require('events') 
    , eventEmitter = new EventEmitter() 
    , timeout = 10000 
    , configConnection = { /* ..config options */ } 
    , queue = 'users' 
    , exchange = 'users.exchange' 
    , type = 'fanout' 

/** 
* declare annonymous function as async and immediately called it 
*/ 
(async() => { 
    /** 
    * declare connection and channel with using async/await construction 
    * who support version node.js >= 8.5.0 
    */ 
    let conn = await amqp.connect(configConnection) 
    let channel = await conn.createChannel() 
    await channel.assertExchange(exchange, type) 
    let response = await channel.assertQueue(queue) 
    /** 
    * response: { queue: 'users', messageCount: 10, consumerCount: 0 } 
    */ 
    let messageCount = response.messageCount 
    response = await channel.bindQueue(response.queue, exchange, '') 
    response = await channel.consume(response.queue, logMessage(messageCount), {noAck: false}) 
    /** 
    * {noAck: false} false for not expect an acknowledgement 
    */ 

    /** 
    * declare timeout if we have problems with emit event in consume 
    * we waiting when event will be emit once 'consumeDone' and promise gain resolve 
    * so we can go to the next step 
    */ 
    setTimeout(() => eventEmitter.emit('consumeDone'), timeout) 
    await new Promise(resolve => eventEmitter.once('consumeDone', resolve)) 
    console.log('reading for query finish') 

    function logMessage(messageCount) { 
    return msg => { 
     console.log("[*] recieved: '%s'", msg.content.toString()) 
     if (messageCount == msg.fields.deliveryTag) { 
     eventEmitter.emit('consumeDone') 
     } 
    } 

    } 
})() 
関連する問題