理解Confirm消息确认机制:
消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产这一个应答。
生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。
如何实现Confirm确认消息?
第一步:在channel上开启确认模式:channel.confirmSelect()
第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,
根据具体的结果对消息进行重新发送、或者记录日志等后续处理。
1 //生产端代码 2 //1 创建ConnectionFactory 3 ConnectionFactory connectionFactory = new ConnectionFactory(); 4 connectionFactory.setHost("127.0.0.1"); 5 connectionFactory.setPort(5672); 6 connectionFactory.setVirtualHost("/"); 7 8 //2 获取C onnection 9 Connection connection = connectionFactory.newConnection(); 10 11 //3 通过Connection创建一个新的Channel 12 Channel channel = connection.createChannel(); 13 14 15 //4 指定我们的消息投递模式: 消息的确认模式 16 channel.confirmSelect(); 17 18 String exchangeName = "test_confirm_exchange"; 19 String routingKey = "confirm.save"; 20 21 //5 发送一条消息 22 String msg = "Hello RabbitMQ Send confirm message!"; 23 channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); 24 25 //6 添加一个确认监听 26 channel.addConfirmListener(new ConfirmListener() { 27 @Override 28 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 29 System.err.println("-------no ack!-----------"); 30 } 31 32 @Override 33 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 34 System.err.println("-------ack!-----------"); 35 } 36 });
1 //消费端代码 2 //1 创建ConnectionFactory 3 ConnectionFactory connectionFactory = new ConnectionFactory(); 4 connectionFactory.setHost("127.0.0.1"); 5 connectionFactory.setPort(5672); 6 connectionFactory.setVirtualHost("/"); 7 8 //2 获取C onnection 9 Connection connection = connectionFactory.newConnection(); 10 11 //3 通过Connection创建一个新的Channel 12 Channel channel = connection.createChannel(); 13 14 String exchangeName = "test_confirm_exchange"; 15 String routingKey = "confirm.#"; 16 String queueName = "test_confirm_queue"; 17 18 //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key 19 channel.exchangeDeclare(exchangeName, "topic", true); 20 channel.queueDeclare(queueName, true, false, false, null); 21 channel.queueBind(queueName, exchangeName, routingKey); 22 23 //5 创建消费者 24 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); 25 channel.basicConsume(queueName, true, queueingConsumer); 26 27 while(true){ 28 Delivery delivery = queueingConsumer.nextDelivery(); 29 String msg = new String(delivery.getBody()); 30 31 System.err.println("消费端: " + msg); 32 }
Return消息机制
Return Listener用于处理一些不可路由的消息。
我们的消息生产者,通过指定一个Exchange和Routingkey,把消息送到某一个队列中,
然后我们的消费者监听队列,进行消息处理操作。
但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,
这个时候我们需要监听这种不可达的消息,就要使用return listener。
在基础API中有一个关键的配置项:
Mandatory:如果为true,则监听会接收到路由不可达的消息,然后进行后续处理,
如果为false,那么broker端自动删除该消息。(默认false)
1 //生产端代码 2 ConnectionFactory connectionFactory = new ConnectionFactory(); 3 connectionFactory.setHost("127.0.0.1"); 4 connectionFactory.setPort(5672); 5 connectionFactory.setVirtualHost("/"); 6 7 Connection connection = connectionFactory.newConnection(); 8 Channel channel = connection.createChannel(); 9 10 String exchange = "test_return_exchange"; 11 String routingKey = "return.save"; 12 String routingKeyError = "abc.save"; 13 14 String msg = "Hello RabbitMQ Return Message"; 15 16 17 channel.addReturnListener(new ReturnListener() { 18 @Override 19 public void handleReturn(int replyCode, String replyText, String exchange, 20 String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 22 System.err.println("---------handle return----------"); 23 System.err.println("replyCode: " + replyCode); 24 System.err.println("replyText: " + replyText); 25 System.err.println("exchange: " + exchange); 26 System.err.println("routingKey: " + routingKey); 27 System.err.println("properties: " + properties); 28 System.err.println("body: " + new String(body)); 29 } 30 }); 31 32 33 channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); 34
//消费端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费者: " + msg); }
运行结果: