For one of the requirement we need to keep track of queue depth and successfully processed messages. The idea is to publish messages and get a list of successful and failed messages. To simulate the requirement I did the following
- Publish the messages with Mandatory and Immediate flag sent channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello World".bytes
- The consumer consumes even marked ( I have put numbers from 1..10 as marked value in header of each messages) and does not ACKS odd numbered messages.
- I have implemented setReturnListnere in the publisher to capture undelivered messages.
While am able to get the number of unack messages via Rabbmitmqctl list_queues messages_unacknowledged, somehow my handleBasicReturn method does not gets called. Am in missing something.
Code snippets:
Producer:
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode, String replyText, String exchange,
开发者_开发技巧 String routingKey, AMQP.BasicProperties properties,
byte[] body)
throws IOException {
println "Debugging messages!!!!"
println "The details of returned messages are ${replyText} from ${exchange} with routingKey as ${routingKey} with properties"
}
});
println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
println "Sending file ${i}....."
def headers = new HashMap<String,Object>()
headers.put "operatiion","scp"
headers.put "dest","joker.dk.mach.com"
headers.put "id", i
println headers
BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes
i++
}
channel.close()
Consumer:
while (true) {
def delivery = consumer.nextDelivery()
def headers = delivery?.properties?.headers
def id = headers.get("id")
println "Received message:"
println " ${id.toString()}"
if( id % 2 == 0){
channel.basicAck delivery.envelope.deliveryTag, false
}
}
Read this explanation about how the immediate and mandatory flags impact message delivery in RabbitMQ.
In your case, since you have a consumer receiving messages, messages won't be returned even if the consumer never acks them.
D.
精彩评论