一、消息如何保障100%的投递成功?
1. 什么是生产端的可靠性投递?
(1)保障消息的成功发出
(2)保障MQ节点的成功接收
(3)发送端收到MQ节点(Broker)确认应答
(4)完善的消息进行补偿机制
2. 生产端 - 可靠性投递(BAT/TMD 互联网大厂的解决方案)
(1)消息落库,对消息状态进行打标(思考:在高并发的场景下是否适合?)
(2)消息的延迟投递,做二次确认,回调检查(节省了数据落库的这一步)
二、幂等性概念
一句话概括:可能你要对一件事情进行操作100次,1000次,结果都是相同的。比如 对一个SQL执行100次1000次,我们可以借鉴数据库的乐观锁机制:比如我们执行一条更新库存的SQL语句:update T_reps set count = count -1, version = version +1 where version = 1
1. 消费端 - 幂等性保障
在海量订单产生的业务高峰期,如何避免消息的重复消费问题
1. 消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息。
业界主流的幂等性操作解决方案:
(1)唯一Id + 指纹码 机制,利用数据库主键去重
1)select count(1) from T_order where id = 唯一ID + 指纹码
2)好处:实现简单
3)坏处:高并发下有数据库写入的性能瓶颈
4)解决方案:跟进ID进行分库分表进行算法路由
(2)利用Redis的原子性取实现
1)第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
2)第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略。
三、 Confirm确认
(1)消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们产生一个应答。
(2)生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障。
如何实现Confirm确认消息?
第一步:在channel上开启确认模式:channel.confirmSelect()
第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; public class Producer { //交换机名称 private static final String EXCHANGE_NAME = "test_exchange_fanout_message_confirm"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null); //指定消息投递模式为:消息的确认模式 channel.confirmSelect(); //设置confirm返回监听 channel.addConfirmListener(new ConfirmListener() { //1.处理失败场景,deliveryTag:消息的唯一标签 //失败场景:磁盘写满了,队列数达到上限了mq出问题了等 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("-----------no ack------------" + deliveryTag + " | " + multiple); } //2.处理成功场景,deliveryTag:消息的唯一标签 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("-----------ack------------" + deliveryTag + " | " + multiple); } //3.第三种情况,Ack 和No Ack都没有收到,这就需要可靠性投递来解决。假设Broker端返回的确认突然出现网络的闪断, //那我连ACK到底成功还是失败都不知道,那怎么办呢?用定时任务取抓取一些中间状态的消息,然后重新触发发送,补偿。 }); String message = "发送一条需要确认的消息!!!!"; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("生产者 send :"+message); channel.close(); connection.close(); System.out.println("over"); } }
四、Return消息机制
1. Return Listener用于处理一些不可路由的消息
2. 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作。
3. 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener
4. 在API中有一个关键的配置项 Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker会自动删除该消息。
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; public class Producer { private static final String EXCHANGE_NAME = "test_exchange_fanout_message_confirm";//交换机名称 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null); //设置消息发送后匹配失败时的处理方法 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode,String replyText,String exchange, String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException { System.out.println("-----------ReturnListener start ------------"); System.out.println(replyCode + " | " + replyText + " | " + exchange + " | " + routingKey + " | " + properties + " | " + new String(body)); System.out.println("-----------ReturnListener end ------------"); } }); String message = "发送一条需要确认的消息!!!!"; boolean mandatory = true; //[ˈmændətəri] adj.强制的; 法定的; 义务的; n. 受托者; channel.basicPublish(EXCHANGE_NAME, "", mandatory, null, message.getBytes()); System.out.println("生产者 send :"+message); channel.close(); connection.close(); System.out.println("over"); } }
五、消费端限流
假设一个场景,首先,我们Rabbitmq服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,就会出现如下情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同事处理这么多数据。
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置QoS的值)未被确认前,不尽兴消费新的消息。
限流API
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
参数讲解:
prefetchSize:消息的最大大小,比如5M,0为不限制;
prefetchCount:最多发送多少条消息,实际工作中设置为1。会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,
即一旦有N个消息还没有ack,则该consumer将block(阻塞)掉,直到有消息ack.
global:是否将上面设置应用于channel级别还是consumer级别。true:在channel通道级别做限制;false:在consumer级别做限制
注意:
prefetchSize和global这两项,Rabbitmq没有实现,暂且不研究,一个设置为0,一个设置为false就好了。
prefetch_count在no_ask=false的情况下生效,即在自动应答的情况下这两个值是不生效的。(必须手动签收,不能自动签收)
//同一时刻服务器只会发一条数据给消费者 channel.basicQos(0, 1, false); // channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println("收到消息:"+message); boolean multiple = false; //是否批量签收 //这个方法会主动回送给Broker一个应答,表示这条消息我已经处理完了,你可以再给我下一条了 channel.basicAck(envelope.getDeliveryTag(),multiple); } }; boolean autoAck = false; //不自动确认 channel.basicConsume(QUEUE_NAME,autoAck,consumer);
注意:
如果 channel.basicAck(envelope.getDeliveryTag(),multiple); 代码被注释掉,则收到一条消息后则没法再继续处理了下一条消息了。
比如生产者发了5个,处理完第一个后,就卡在这了,其余4条没有继续处理。
六、消息的ACK(手动确认)与NACK(重回队列)
消费端的手工ACK和NACK
手工ACK:消费成功了,向发起者确认
NACK:消费失败,让生产者重新发
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。
如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功。
消费端的重回队列
消费端重回队列是为了对没有处理成功的消息,把消息重新回递给Broker!
一般我们在实际应用中,都会关闭重回队列,也就是设置为false
boolean autoAck = false; //不自动确认 channel.basicConsume(QUEUE_NAME,autoAck,consumer);
/** * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了 * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可 */ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE_NAME = "test_exchange_topic"; private final static String ROUTING_KEY_NAME = "order.update"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //发送五条消息,属性参数properties for (int i = 0; i < 3; i++) { //【设置属性参数】:AMQP.BasicProperties() start Map<String,Object> headers = new HashMap<>(); headers.put("id", i); headers.put("name", "xiaochao"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .contentEncoding("UTF-8") .deliveryMode(2) .expiration("10000") .headers(headers) .build(); String message = "匹配insert" + i; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,true,properties,message.getBytes()); } channel.close(); connection.close(); System.out.println("game over"); } }
import java.io.IOException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; private final static String ROUTING_KEY_NAME = "order.*";//order.# public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME); //channel.basicQos(1); //允许一次多个消息进到队列里来 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(properties.getHeaders().get("id") + " | "+ new String(body,"UTF-8")); if(properties.getHeaders().get("id").equals(1)) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
返回结果:
0 | 匹配insert0 1 | 匹配insert1 2 | 匹配insert2 (由于没有限制basicQos(1):允许一次多个消息进来,所以会先消费所有队列里的,然后) 1 | 匹配insert1 1 | 匹配insert1 1 | 匹配insert1 1 | 匹配insert1
id为1的重发了4次,总共5次,
如果将channel.basicQos(1);放开,每次只允许一个进来,那么结果如下:
0 | 匹配insert0 1 | 匹配insert1 1 | 匹配insert1(由于basicQos(1):一次只允许一个消息进来,所以1会一直重发) 1 | 匹配insert1 1 | 匹配insert1 1 | 匹配insert1
七、TTL队列 / 消息(Time To Live)生存时间
1. RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
2. RabbitMQ支持队列的过期时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动地被删除
界面操作步骤:
1. 创建队列并设置超时时间
2. 创建Exchange
3. Exchange上绑定路由
4. 在能被路由到的队列里也可以看到刚刚由Exchange里创建的绑定
5. Exchange中发消息
6. 在队列里看到了这条消息
7. 10秒后消息消失
8. 发消息时可以设定超时属性:
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .expiration("10000").build();
八、死信队列(DLX:Dead-Letter-Exchange)
0. 何为死信、死信队列
死信:当一个消息没有消费者取消费,此消息就是死信了。任何MQ中都有死信的概念。
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
1. 消息变成死信的几种情况
1. 消息被绝(basic.reject / basic.nack) 并且requeue=false:不需要重回队列了
2. 消息TTL过期
3. 队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。
2. 死信队列的具体实现实战
正常声明交换机、队列、绑定:
(1)Exchange:dlx.excahnge
(2)Queue:dlx.queue
(3)RoutingKey: #
(4)队列上加参数:argumentgs.put("x-dead-letter-exchange","dlx.exchange");
启动消费端,创建交换机和队列以及它们之间的绑定关系,然后关闭消费者端程序,使其无法接收并处理队列信息
import java.io.IOException; import java.util.HashMap; import java.util.Map; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer1 { private static final String EXCHANGE_NAME = "test_dlx_exchange"; private static final String QUEUE_NAME = "test_dlx_queue"; private final static String ROUTING_KEY_NAME = "dlx.#"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //1. 声明(创建)正常接收消息的队列 Map<String,Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); //设置死信队列参数:交换机名称 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); channel.queueDeclare(QUEUE_NAME,false,false,false,arguments); //本队列绑定死信队 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY_NAME); channel.basicQos(1); //允许一次多个消息进到队列里来 //2. 声明(创建)死信队列 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false,null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(properties.getHeaders().get("id") + " | "+ new String(body,"UTF-8")); boolean multiple = false; if(properties.getHeaders().get("id").equals(1)) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } boolean requeue = false; //重回队列 channel.basicNack(envelope.getDeliveryTag(), multiple, requeue); } else { channel.basicAck(envelope.getDeliveryTag(), multiple); } } }; boolean autoAck = false; //是否自动确认,设置为否 channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
启动消息发送者程序
/** * 生产者端只发送消息给Exchange还有指定路由key,并不再指定具体队列名称了 * 具体队列名称有消费者端创建,随便创建,只要其队列绑定到了本Exchange和路由即可 */ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import com.everjiankang.dependency.ConnectionUtil; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String EXCHANGE_NAME = "test_dlx_exchange"; private final static String ROUTING_KEY_NAME = "dlx.save"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); //发送五条消息,属性参数properties for (int i = 0; i < 1; i++) { //【设置属性参数】:AMQP.BasicProperties() start Map<String,Object> headers = new HashMap<>(); headers.put("id", i); headers.put("name", "xiaochao"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .contentEncoding("UTF-8") .deliveryMode(2) .expiration("10000") .headers(headers) .build(); String message = "hello DLX message" + i; channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY_NAME,true,properties,message.getBytes()); } channel.close(); connection.close(); System.out.println("game over"); } }
界面端现象:
1.启动消费者端程序,然后暂停消费者端程序,再启动生产者发生消息,消息首先发送到了test_dlx_queue
2. 由于消费端程序被关闭了,且消息又设置了过期时间,所以10s后RabbitMQ将消息转至死信队列