0
キューの深さを追跡し、メッセージを正常に処理する必要があるという要件の1つとして、アイデアは、メッセージを公開し、成功したメッセージと失敗したメッセージのリストを取得することです。私は、次のRabbitMQ:setReturnListner handleBasic返されないメッセージが返される
- が必須と即時フラグ付きメッセージを公開したの要件をシミュレートするために
- channel.basicPublish「為替」、「RKEY」、真、偽、小道具、「Hello World」の.bytesを送りました消費者は(私は各メッセージのヘッダーにマークされた値として1..10からの数字を入れています)消費し、奇数番号のメッセージをACKしません。
- 配信されていないメッセージをキャプチャするためにパブリッシャにsetReturnListnereを実装しました。
messages_unacknowledged Rabbmitmqctlのlist_queues経由で未確認のメッセージ数を取得することができていながら、何とか私のhandleBasicReturnメソッドが呼び出されません。何かを逃している。
コードスニペット:
プロデューサー:
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] body)
throws IOException {
println "Debugging messages!!!!"
println "The details of returned messages are ${replyText} from ${exchange} with routingKey as ${routingKey} with properties"
}
});
println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
println "Sending file ${i}....."
def headers = new HashMap<String,Object>()
headers.put "operatiion","scp"
headers.put "dest","joker.dk.mach.com"
headers.put "id", i
println headers
BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes
i++
}
channel.close()
消費者:
while (true) {
def delivery = consumer.nextDelivery()
def headers = delivery?.properties?.headers
def id = headers.get("id")
println "Received message:"
println " ${id.toString()}"
if(id % 2 == 0){
channel.basicAck delivery.envelope.deliveryTag, false
}
}