一 RabbitMQ可靠性投递生产者确认机制
本章节我们主要聊一聊RabbitMQ使用必须考虑的问题,就是消息可靠性!在生产环境下如何确保消息的可靠性投递,我们首先需要考虑两个问题
1、生产者发送消息,是否发送成功?
2、消费者接收消息如何确认以及拒绝?
当然我们所说的可靠并非一个绝对的概念,因网络、硬件、不可抗因素等;可靠性是一个相对的概念,在条件合理的范围内系统所能确保一切尽可能的趋于完美的消息可靠性;
我们来思考一下需要考虑哪些环节;
Send Massage(消息投递者) 在将消息发送到交换器Exchange的时候,默认RabbitMQ不进行确认投递者是不知道是否投递成功,也就是默认情况下生产者是不知道消息有没有正确地到达服务器,没有到达服务器,如果出现如:网络闪断等因素,则这条消息会无法投递到Exchange
Exchange通过RoutingKey将消息路由至Queue ,这个环节中如果无法路由至Queue队列,如何处理该消息?消息已经路由至Queue队列,却发现没有消费者,又如何处理?,是否也有一样的通知机制告诉我们?
在接收者Receive Message(消息消费者) 在接收到消息后,如何通知RabbitMQ我已经接收到该消息?是否消费者也需要一个确认告知RabbitMQ已经接收到消息?
带着这一系列问题,我们先来看看如何进行保障消息投递的确认;
二 、生产者确认
RabbitMQ针对这个问题,提供了两种解决方式;
- 事务机制 :RabbitMQ提供了事务机制保证消息投递,RabbitMQ客户端中与事务机制相关的方法有三个: channel.txSelect 和
channel.txCommit 和channel.txRollback
channel.txSelect : 将当前的channel通道设置为事务模式;
channel.txCommit :用于提交事务;
channel.txRollback :用于事务回滚;
try { channel.txSelect(); channel.basicPublish(exchange , routingKey , MessageProperties.PERSISTENT_TEXT_PLAIN , msg.getBytes()); int result = 1 / 0 ; channel.txCommit(); }catch (Exception e) { e.printStackTrace(); channel.txRollback(); }
那么,既然已经有事务了,为何还要使用发送方确认模式呢,原因是因为事务的性能是非常差的。根据相关资料,事务会降低2~10倍的性能。
- **生产者确认机制 :(Publisher Confirm)机制 **
、生产者将Channel设置成Confirm模式,当设置Confirm模式后所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始,ID在同个Channel范围是唯一的),一旦消息被投递到所有匹配的队列之后Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;
2、如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出;
3、RabbitMQ回调消息的deliveryTag包含了确认消息的ID,此外RabbitMQ也可以设置channel.basicAck 方法中的multiple参数,表示到这个序号之前的所有消息都己经得到了处理;稍后介绍handleNack 和 handleAck两个方法我们再举个说明;
4、confirm的机制是异步的,如果消息成功发送,会返回ack消息供异步处理,如果消息发送失败发生异常,也会返回nack消息,confirm的时间没有明确说明,并且同一个消息只会被confirm一次;
接下来介绍几种confirm方法
- 普通confirm方法 : 每发送一条消息后,调用channel.waitForConfirms方法,等待服务器的确认返回;
先看代码样例,注意看注释
//开启confirm模式 channel.confirmSelect(); //模拟发送50条消息 for(int i =0;i<1000;i++){ String message = "Hello World RabbitMQ"; //发送消息 channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); //每发送2条判断一次是否回复 if(i%2==0){ //waitForConfirms可以换成带有时间参数的方法waitForConfirms(Long mills)指定等待响应时间 if(channel.waitForConfirms()){ System.out.println("Message send success."); } } }
2. 批量confirm方法 :每发送一批消息后,调用channel.waitForConfirmsOrDie()批量确认模式,
class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String queueName; private String routingKey; private String bindingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; this.bindingKey = bindingKey; } public void run() { Channel channel = null; try { Connection connection = factory.newConnection(); channel = connection.createChannel(); //创建exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //创建队列 channel.queueDeclare(queueName, true, false, false, null); //绑定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey); channel.confirmSelect(); //发送持久化消息 for(int i = 0;i < count;i++) { //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的, //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话 //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键 channel.basicPublish(exchangeName, routingKey,MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes()); } long start = System.currentTimeMillis(); channel.waitForConfirmsOrDie(); System.out.println("执行waitForConfirmsOrDie耗费时间: "+(System.currentTimeMillis()-start)+"ms"); } catch (Exception e) { e.printStackTrace(); } }
批量的方法从数量级上降低了confirm的性能消耗,提高了效率,但是批量confmn方式的问题在于遇到RabbitMQ服务端返回Basic.Nack 需要重发批量消息而导致的性能降低,也可能导致消息重复消费
3.异步confirm方法(推荐) :提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理;
依旧还是先看代码:
生产者
1 public class ConfirmProducer { 2 3 public static void main(String[] args) throws Exception { 4 //1 创建ConnectionFactory 5 ConnectionFactory connectionFactory = new ConnectionFactory(); 6 connectionFactory.setHost("192.168.1.28"); 7 connectionFactory.setPort(5672); 8 connectionFactory.setVirtualHost("/"); 9 connectionFactory.setUsername("toher"); 10 connectionFactory.setPassword("toher888"); 11 //2 创建Connection 12 Connection connection = connectionFactory.newConnection(); 13 //3 创建Channel 14 Channel channel = connection.createChannel(); 15 //4 指定我们的消息投递模式: 消息的确认模式 16 channel.confirmSelect(); 17 //5 声明交换机 以及 路由KEY 18 String exchangeName = "test_confirm_exchange"; 19 String routingKey = "confirm.send"; 20 //6 发送一条消息 21 String msg = "Test Confirm Message"; 22 channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); 23 //7 添加确认监听 24 channel.addConfirmListener(new ConfirmListener(){ 25 @Override 26 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 27 System.err.println("收到NACK应答"); 28 } 29 @Override 30 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 31 System.err.println("收到ACK应答"); 32 } 33 }); 34 } 35 36 }
消费者:
public class ConfirmConsumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_confirm_exchange"; //指定类型为topic String exchangeType = "topic"; String queueName = "test_confirm_queue"; //因为*号代表匹配一个单词,生产者中routingKey3将匹配不到 String routingKey = "confirm.*"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, true, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费端:" + msg); } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); } }
运行效果
从上面代码我们可以看到有重写了ConfirmListener两个方法:handleNack 和 handleAck,分别用来处理RabbitMQ 回传的Basic.Nack和Basic.Ack;
它们都有两个参数:
long deliveryTag : 前面介绍确认消息的ID
boolean multiple : multiple 是否批量 如果是True 则将比该deliveryTag小的所有数据都移除 否则只移除该条;
我们简单的用一个数组来说明 [1,2,3,4]存储着4条消息ID , 此时确认消息返回的是 deliveryTag = 3 ,multiple = true那么RabbitMQ会通知我们小于ID3的消息得到确认了,如果multiple = false, 就通知我们ID3的确认了
我们再用修改一下上面的代码看一下
//声明一个用来记录消息唯一ID的有序集合SortedSet final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //开启confirm模式 channel.confirmSelect(); //异步监听方法 处理ack与nack方法 channel.addConfirmListener(new ConfirmListener() { //处理ack multiple 是否批量 如果是批量 则将比该条小的所有数据都移除 否则只移除该条 public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag).clear(); } else { confirmSet.remove(deliveryTag); } } //处理nack 与ack相同 public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag).clear(); } else { confirmSet.remove(deliveryTag); } } });
以上代码按照每一个comfirm的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack或者nack的数据,集合删除一条。SortedSet是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
三 消费者 确认和拒绝
背景如下:
在接收者Receive Message(消息消费者) 在接收到消息后,如何通知RabbitMQ我已经接收到该消息?是否消费者也需要一个确认告知RabbitMQ已经接收到消息?答案是肯定的,就是今天我们要介绍的消费端ACK
1、消费端ACK
不知道大家还是否对以下这个方法有印象?设置消费确认
//参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer);
第二个参数:
当autoAck等于true时,RabbitMQ会自动把发送出去的消息设置为确认,然后从队列中删除,而不管消费者是否真正正确的使用了这些消息完成了自己的业务;
当autoAck等于false时,RabbitMQ会设置当消费者收到消息采用手工形式进行确认,证明消费端已经接收到了该消息了,RabbitMQ可以从队列中删除该消息了
样例
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); //消费端消息确认,并删除 channel.basicAck(envelope.getDeliveryTag(), false); } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, false, consumer);
2、消费端拒绝
在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,可以调用 channel.basicReject 告诉RabbitMQ拒绝该消息,方法参数介绍如下
//deliveryTag 消息ID //requeue true = (重回队列 / false = 删除该消息) void basicReject(long deliveryTag, boolean requeue) throws IOException;
而channel.basicReject 一次只能拒绝一条消息,如果需要批量拒绝那么就需要用到 channel.basicNack,参数介绍如下
//deliveryTag 消息ID //multiple (true = 批量 / false = 不批量) //requeue (true = 重回队列 / false = 删除该消息) void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException
接下来我们用样例来说明:
生产者:
public class AckMsgProducer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect(); //5 声明交换机 以及 路由KEY String exchangeName = "test_ack_exchange"; //这里故意用一个错误的routingKey 以便测试交换机路由不到队列 String routingKey = "ack.send"; //6 发送一条消息 String msg = "Test ACK Message"; for(int i =1; i<5; i ++){ Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num", i); //为了测试消息拒绝 我们传递一个自定义参数 消费端进行测试 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); msg = msg + " 第"+ i +"条"; channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes()); } } }
消费者
public class AckMsgConsumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("192.168.1.28"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("toher"); connectionFactory.setPassword("toher888"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_ack_exchange"; //指定类型为topic String exchangeType = "topic"; String queueName = "test_ack_queue"; //因为*号代表匹配一个单词 String routingKey = "ack.*"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, true, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); //让第一条数据 采用 Nack 拒绝 并重回队列 if((Integer)properties.getHeaders().get("num") == 1) { System.out.println("未消费:" + msg); channel.basicNack(envelope.getDeliveryTag(), false, true); } else { System.out.println("已消费:" + msg); channel.basicAck(envelope.getDeliveryTag(), false); //测试channel.basicReject 开启 //channel.basicReject(envelope.getDeliveryTag(), false); } } }; //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, false, consumer); } }
运行效果:
以上代码传递了自定义参数,以参数num =1 设定拒绝让其重回队列,可以看到后几条都已经消费成功,唯独第一条数据一直重复消费,重复回归队列;
到RabbitMQ控制台会发现一直有一条Ready中的消息
————————————————
版权声明:本文为CSDN博主「傲泣龙腾」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/lhmyy521125/java/article/details/88135904