一、RabbitMQ的的死信队列+ TTL
1、什么是TTL
- time to live 消息存活时间
- 如果消息在存活时间内未被消费,则会被清除
- RabbitMQ支持两种ttl设置
- 单独消息进行配置ttl
- 整个队列进行配置ttl(居多)
2、什么是rabbitmq的死信队列
- 没有被及时消费的消息存放的队列
3、什么是rabbitmq的死信交换机
- Dead Letter Exchange(死信交换机,缩写: DLX)当消息成为死信后,会被重新发送到另⼀个交换机,这个交换机就是DLX死信交换机。
4、消息有哪几种情况成为死信
- 消费者拒收消息(basic.reject/ basic.nack) ,并且没有重新入队 requeue=false
- 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
- 队列的消息长度达到极限
- 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
二、RabbitMQ管控台消息TTL测试
队列过期时间使用参数,对整个队列消息统⼀过期
- x-message-ttl
- 单位ms(毫秒)
消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,消息还在队列里面)
- expiration
- 单位ms(毫秒)
两者都配置的话,时间短的先触发
1、RabbitMQ Web控制台测试
新建死信交换机(和普通没区别)
新建死信队列 (和普通没区别)
死信交换机和队列绑定
新建普通队列,设置过期时间、指定死信交换机
测试:直接web控制台往product_qeueu发送消息即可,会看到消息先是在product_qeueu队列停留10秒(因为没有消费者消费),然后该消息从product_qeueu移入到dead_queue。
三、RabbitMQ的延迟队列和应用场景
1、什么是延迟队列
⼀种带有延迟功能的消息队列, Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某⼀个时间投递到Consumer进行消费,该消息即定时消息。
2、使用场景
- 通过消息触发⼀些定时任务,比如在某⼀固定时间点向用户发送提醒消息
- 用户登录之后5分钟给用户做分类推送、用户多少天未登录给用户做召回推送;
- 消息生产和消费有时间窗⼝要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送⼀条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略
四、实战
1、背景
JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号。
2、代码开发
死信交换机和死信队列开发,topic交换机和队列开发,绑定死信交换机
package net.xdclass.xdclasssp.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { //死信队列 public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue"; //死信交换机 public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange"; //进入死信队列的路由key public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key"; //创建死信交换机 @Bean public Exchange lockMerchantDeadExchange() { return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false); } //创建死信队列 @Bean public Queue lockMerchantDeadQueue() { return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build(); } //绑定死信交换机和死信队列 @Bean public Binding lockMerchantBinding() { return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE, LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null); } //普通队列,绑定的个死信交换机 public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue"; //普通的topic交换机 public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange"; //路由key public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key"; //创建普通交换机 @Bean public Exchange newMerchantExchange() { return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false); } //创建普通队列 @Bean public Queue newMerchantQueue() { Map<String, Object> args = new HashMap<>(3); //消息过期后,进入到死信交换机 args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE); //消息过期后,进入到死信交换机的路由key args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY); //过期时间,单位毫秒 args.put("x-message-ttl", 10000); return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build(); } //绑定交换机和队列 @Bean public Binding newMerchantBinding() { return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE, NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null); } }
消息生产和消费
- 消息生产
- 投递到普通的topic交换机
- 消息过期,进入死信交换机
- 消息消费
- 消费者监听死信交换机的队列
MerchantAccountController 模拟请求
@RestController @RequestMapping("/api/admin/merchant") public class MerchantAccountController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("check") public Object check(){ //修改数据库的商家账号状态 TODO rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核"); Map<String,Object> map = new HashMap<>(); map.put("code",0); map.put("msg","账号审核通过,请10秒内上传1个商品"); return map; } }
MerchantMQListener消费
package net.xdclass.xdclasssp.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = "lock_merchant_dead_queue") public class MerchantMQListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("body="+body); //做复杂业务逻辑 TODO //告诉broker,消息已经被确认 channel.basicAck(msgTag,false); } }