RabbitMQ学习 二
1、RabbitMQ 如何保证消息的可靠性投递
RabbitMQ在进行消息的发送时是的流程如下
producer(消息生产者)---> RabbitMQ broker(消息中间件) ---> exchange (交换机)---> queue (消息队列) ---> consumer (消费者)
因此在producer到exchange 以及 exchange到 queue这两个阶段可以进行对消息投递可靠性进行确认
RabbitMQ 提供来两种方式来控制消息的投递可靠性模式
- confirm 确认模式
- return 回退模式
确认模式:
- 设置ConnectionFactory的publisher-confirm=true 开启确认模式
-
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
yml中的配置
1 <!-- 定义rabbitmq connectionFactory --> 2 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" 3 port="${rabbitmq.port}" 4 username="${rabbitmq.username}" 5 password="${rabbitmq.password}" 6 virtual-host="${rabbitmq.virtual-host}" 7 publisher-confirms="true" 8 publisher-returns="true" 9 /> 10 <!--消息可靠性投递(生产端)--> 11 <rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue> 12 <rabbit:direct-exchange name="test_exchange_confirm"> 13 <rabbit:bindings> 14 <rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding> 15 </rabbit:bindings> 16 </rabbit:direct-exchange>
代码
1 @Autowired 2 private RabbitTemplate rabbitTemplate; 3 4 5 /** 6 * 确认模式: 7 * 步骤: 8 * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true" 9 * 2. 在rabbitTemplate定义ConfirmCallBack回调函数 10 */ 11 @Test 12 public void testConfirm() { 13 14 //2. 定义回调 15 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { 16 /** 17 * 18 * @param correlationData 相关配置信息 19 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败 20 * @param cause 失败原因 21 */ 22 @Override 23 public void confirm(CorrelationData correlationData, boolean ack, String cause) { 24 System.out.println("confirm方法被执行了...."); 25 26 if (ack) { 27 //接收成功 28 System.out.println("接收成功消息" + cause); 29 } else { 30 //接收失败 31 System.out.println("接收失败消息" + cause); 32 //做一些处理,让消息再次发送。 33 } 34 } 35 }); 36 37 //3. 发送消息 38 rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm...."); 39 }
回退模式:
- 设置ConnectionFactory的publisher-returns="true" 开启 退回模式。
-
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
xml配置
1 <!-- 定义rabbitmq connectionFactory --> 2 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" 3 port="${rabbitmq.port}" 4 username="${rabbitmq.username}" 5 password="${rabbitmq.password}" 6 virtual-host="${rabbitmq.virtual-host}" 7 publisher-confirms="true" 8 publisher-returns="true" 9 />
代码
1 /** 2 * 回退模式: 当消息发送给Exchange后,Exchange路由到Queue失败是 才会执行 ReturnCallBack 3 * 步骤: 4 * 1. 开启回退模式:publisher-returns="true" 5 * 2. 设置ReturnCallBack 6 * 3. 设置Exchange处理消息的模式: 7 * 1. 如果消息没有路由到Queue,则丢弃消息(默认) 8 * 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack 9 */ 10 11 @Test 12 public void testReturn() { 13 14 //设置交换机处理失败消息的模式 15 rabbitTemplate.setMandatory(true); 16 17 //2.设置ReturnCallBack 18 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { 19 /** 20 * 21 * @param message 消息对象 22 * @param replyCode 错误码 23 * @param replyText 错误信息 24 * @param exchange 交换机 25 * @param routingKey 路由键 26 */ 27 @Override 28 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { 29 System.out.println("return 执行了...."); 30 31 System.out.println(message); 32 System.out.println(replyCode); 33 System.out.println(replyText); 34 System.out.println(exchange); 35 System.out.println(routingKey); 36 37 //处理 38 } 39 }); 40 41 42 //3. 发送消息 43 rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm...."); 44 }
2、Consumer Ack 消费端确认
Ack指的是Acknowledge确认,表示消费端收到消息后的确认方式。
三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto"
RabbitMQ 默认是使用的自动确认的方式
1 <!--定义监听器容器--> 2 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > 3 <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> 4 <!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>--> 5 <!--定义监听器,监听正常队列--> 6 <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>--> 7 8 <!--延迟队列效果实现: 一定要监听的是 死信队列!!!--> 9 <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener> 10 </rabbit:listener-container>
定义监听器
拒绝签收,如果设置requeue设置为true,则重回队列,继续让mq继续发送消息;如果为false则,将该消息丢弃,成为死信消息
1 /** 2 * Consumer ACK机制: 3 * 1. 设置手动签收。acknowledge="manual" 4 * 2. 让监听器类实现ChannelAwareMessageListener接口 5 * 3. 如果消息成功处理,则调用channel的 basicAck()签收 6 * 4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer 7 */ 8 9 @Component 10 public class AckListener implements ChannelAwareMessageListener { 11 12 @Override 13 public void onMessage(Message message, Channel channel) throws Exception { 14 long deliveryTag = message.getMessageProperties().getDeliveryTag(); 15 try { 16 //1.接收转换消息 17 System.out.println(new String(message.getBody())); 18 19 //2. 处理业务逻辑 20 System.out.println("处理业务逻辑..."); 21 int i = 3/0;//出现错误 22 //3. 手动签收 23 channel.basicAck(deliveryTag,true); 24 } catch (Exception e) { 25 //e.printStackTrace(); 26 //4.拒绝签收 27 /* 28 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端 29 */ 30 channel.basicNack(deliveryTag,true,true); 31 //channel.basicReject(deliveryTag,true); 32 } 33 } 34 }
- 在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息
-
消费端的确认模式一定为手动确认。acknowledge="manual"
xml配置
1 <!--定义监听器容器--> 2 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1" > 3 <rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener> 4 <!-- <rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>--> 5 <!--定义监听器,监听正常队列--> 6 <!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>--> 7 8 <!--延迟队列效果实现: 一定要监听的是 死信队列!!!--> 9 <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener> 10 </rabbit:listener-container>
代码
1 /** 2 * Consumer 限流机制 3 * 1. 确保ack机制为手动确认。 4 * 2. listener-container配置属性 5 * perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。 6 */ 7 8 @Component 9 public class QosListener implements ChannelAwareMessageListener { 10 11 @Override 12 public void onMessage(Message message, Channel channel) throws Exception { 13 14 Thread.sleep(1000); 15 //1.获取消息 16 System.out.println(new String(message.getBody())); 17 18 //2. 处理业务逻辑 19 20 //3. 签收 21 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); 22 23 } 24 }
4、TTL (存活时间、过期时间)
TTL全称Time To Live (存活时间/过期时间)
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间
一般业务中会使用队列过期时间,因为消息过期时间是指,当消息在被消费的时候进行判断是否过期,入过不被消费就一直在消息队列中; 如果两者都进行了设置,则以时间短的为准
注意:对列过期时间,是指,当消息进入队列,开始进行时间计时,达到过期时间,就清除
xml配置
1 <!--ttl--> 2 <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"> 3 <!--设置queue的参数--> 4 <rabbit:queue-arguments> 5 <!--x-message-ttl指队列的过期时间--> 6 <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry> 7 </rabbit:queue-arguments> 8 9 </rabbit:queue> 10 11 <rabbit:topic-exchange name="test_exchange_ttl" > 12 <rabbit:bindings> 13 <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding> 14 </rabbit:bindings> 15 </rabbit:topic-exchange>
代码
1 /** 2 * TTL:过期时间 3 * 1. 队列统一过期 4 * <p> 5 * 2. 消息单独过期 6 * <p> 7 * <p> 8 * 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。 9 * 队列过期后,会将队列所有消息全部移除。 10 * 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉) 11 */ 12 @Test 13 public void testTtl() throws InterruptedException { 14 // 消息后处理对象,设置一些消息的参数信息 15 MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { 16 public Message postProcessMessage(Message message) throws AmqpException { 17 //1.设置message的信息 18 message.getMessageProperties().setExpiration("5000");//消息的过期时间 19 //2.返回该消息 20 return message; 21 } 22 }; 23 //消息单独过期 24 //rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....",messagePostProcessor); 25 for (int i = 0; i < 10; i++) { 26 if (i == 5) { 27 //消息单独过期 28 rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor); 29 } else { 30 //不过期的消息 31 rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl...."); 32 } 33 Thread.sleep(2000); 34 } 35 }
5、死信队列
消息成为死信的三种情况
- 消费者拒绝接受消费信息,basicNack/basicReject,requeue设置为false,不将拒绝签收的消息重新放回原目标队列
-
原队列存在消息过期设置,消息到达超时时间未被消费;
-
队列消息长度到达限制;
死信交换机和死信队列与普通的没有什么区别,当消息成为死信消息后,如果该消息绑定来死信交换机,则会被死信交换机重新路由到死信队列
xml配置
1 <!-- 2 死信队列: 3 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) 4 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) 5 3. 正常队列绑定死信交换机 6 设置两个参数: 7 * x-dead-letter-exchange:死信交换机名称 8 * x-dead-letter-routing-key:发送给死信交换机的routingkey 9 --> 10 11 <!-- 12 1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx) 13 --> 14 15 <rabbit:queue name="test_queue_dlx" id="test_queue_dlx"> 16 <!--3. 正常队列绑定死信交换机--> 17 <rabbit:queue-arguments> 18 <!--3.1 x-dead-letter-exchange:死信交换机名称--> 19 <entry key="x-dead-letter-exchange" value="exchange_dlx" /> 20 21 <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey--> 22 <entry key="x-dead-letter-routing-key" value="dlx.hehe" /> 23 24 <!--4.1 设置队列的过期时间 ttl--> 25 <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" /> 26 <!--4.2 设置队列的长度限制 max-length --> 27 <entry key="x-max-length" value="10" value-type="java.lang.Integer" /> 28 </rabbit:queue-arguments> 29 </rabbit:queue> 30 <rabbit:topic-exchange name="test_exchange_dlx"> 31 <rabbit:bindings> 32 <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding> 33 </rabbit:bindings> 34 </rabbit:topic-exchange> 35 36 37 <!-- 38 2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx) 39 --> 40 41 <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue> 42 <rabbit:topic-exchange name="exchange_dlx"> 43 <rabbit:bindings> 44 <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding> 45 </rabbit:bindings> 46 </rabbit:topic-exchange>
代码
1 /** 2 * 发送测试死信消息: 3 * 1. 过期时间 4 * 2. 长度限制 5 * 3. 消息拒收 6 */ 7 @Test 8 public void testDlx(){ 9 //1. 测试过期时间,死信消息 10 //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?"); 11 12 //2. 测试长度限制后,消息死信 13 /* for (int i = 0; i < 20; i++) { 14 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?"); 15 }*/ 16 17 //3. 测试消息拒收 18 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?"); 19 20 }
消费端代码 ACK代码,只需要将 basicNack参数,requeue改为false 拒收,不重回消息队列
6、延迟队列
延迟队列,即消息进入队列后不会立即被消除,只有到达指定时间后,才会被消费
如下需求:
1.下单后30分钟未支付,取消订单,回滚库存
2.新用户注册成功后7天,发送短信问候
实现方式
1.定时器,对该信息进行轮询判断 (消耗内存)
2.延迟队列
RabbitMQ中并未提供延迟队列功能,但是可以使用TTL+死信队列组合实现延迟队列的效果
6、消息可靠性保障 -- 消息补偿机制
7、消息幂等性保障-- 乐观锁机制