一、场景
查询支付结果。由于支付系统的复杂性,客户支付后钱款可能无法实时到账。此时就需要延时任务轮询查询支付结果。
类似此类无法直接实时获取结果的场景下,都可以使用延时任务完成结果状态的查询。
二、方案
普通交换器+死信交换器。根据延时需求设置消息过期时间,消息过期进入死信队列,消费者监听死信队列,实现延时任务。
三、概念
要理解普通交换器+私信交换器实现延时任务的原理,首先要了解RabbitMQ的消息存活时间TTL、死信交换机的概念。
1. 消息存活时间Time to Live(TTL)
RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连接的保留时间,也可以对每一个单独的消息做单独的设置。如果队列和消息都设置了TTL,那么会取较小的时间。
当队列中的消息存留时间超过了配置的生存时间(TTL),则称该消息已死亡。注意,同一个消息被路由到不同的队列将拥有不同的过期时间,又或者永远不会过期。这取决于消息所存在的队列。一个队列中的死亡消息不会影响到其他队列中与之相同消息的生命周期。
所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
可以通过消息的expiration过期时间字段设置消息的TTL
1 rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> { 2 // 设置过期时间为期望的延时时间,到期后进入死信队列,实现延时任务 3 message.getMessageProperties().setExpiration("10000"); 4 return message; 5 });
上面这个消息设置了过期时间为10000ms,所以10秒后如果没有被消费,则该消息已死亡,即“死信”。单靠死信还无法完成延时任务,还需要死信交换机Dead Letter Exchanges.
2. 死信交换机Dead Letter Exchanges
队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:
- 消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
- 消息由于消息有效期(per-message TTL)过期
- 消息由于队列超过其长度限制而被丢弃
注意,队列的有效期并不会导致其中的消息过期。
死信交换机(DLXs)就是普通的交换机,可以是任何一种类型,也可以用普通常用的方式进行声明。
给队列设置死信交换机时,可以在声明队列时使用可选参数"x-dead-letter-exchange"进行声明配置。该参数值必须是与队列在同一个虚拟主机的交换机名称。注意,并不要求在声明队列时死信交换机必须已经被声明,但是当消息需要死信路由时,该交换机必须存在,否则,消息将会被丢弃。
也可以指定一个路由关键字在死信路由时使用,如果没有设置,那么就会使用消息自身原来的路由关键字。
1 /** 2 * 延时队列 3 * @return 4 */ 5 @Bean 6 public Queue delayQueue() { 7 Map<String, Object> params = new HashMap<>(); 8 params.put("x-dead-letter-exchange", exchangeName); 9 params.put("x-dead-letter-routing-key", routingKeyName); 10 return new Queue(delayQueueName, true, false, false, params); 11 }
上述代码通过两个可选参数给队列delayQueueName设置了死信交换机和死信路由键,也就是说,当队列delayQueueName发生死信时,将会根据配置的死信交换机和死信路由键把死信转发到目标队列中。消费者监听这个死信队列,即可实现延时任务。
四、代码实现
1. 这段示例代码实现的是延时唤醒操作。生产者发送唤醒消息,消费者监听死信队列中的唤醒消息,如果唤醒次数小于6,就再次发送唤醒消息,等待下次唤醒。当唤醒次数为4时,消费者被唤醒,不再发送消息。
2. 代码结构:
- SendWakeUpMsgReqDTO:发送消息的参数
- RabbitMqConfig:MQ配置
- RabbitMqProviderImpl:消息生产者
- RabbitMqConsumerImpl:消息消费者
- AsyncTaskServiceImpl:异步调用 发送消息
SendWakeUpMsgReqDTO:
1 import lombok.Data; 2 3 import java.io.Serializable; 4 5 /** 6 * @Author Nemo Wang 7 * @Date 2021/6/17 19:46 8 * @Description 发送消息的参数 9 */ 10 @Data 11 public class SendWakeUpMsgReqDTO implements Serializable { 12 private static final long serialVersionUID = 6298050708365621926L; 13 14 /** 15 * 来源 16 */ 17 private String sourceName; 18 19 /** 20 * 发送次数 21 */ 22 private int checkTimes; 23 24 /** 25 * 延时时间 26 */ 27 private String delayTime; 28 29 }
RabbitMqConfig:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/17 17:40 4 * @Description RabbitMQ配置类 5 */ 6 @Configuration 7 public class RabbitMqConfig { 8 9 @Value("${mq.delayQueueName}") 10 private String delayQueueName; 11 @Value("${mq.delayExchangeName}") 12 private String delayExchangeName; 13 @Value("${mq.delayRoutingKeyName}") 14 private String delayRoutingKeyName; 15 16 @Value("${mq.queueName}") 17 private String queueName; 18 @Value("${mq.exchangeName}") 19 private String exchangeName; 20 @Value("${mq.routingKeyName}") 21 private String routingKeyName; 22 23 /** 24 * 延时队列 25 * @return 26 */ 27 @Bean 28 public Queue delayQueue() { 29 Map<String, Object> params = new HashMap<>(); 30 params.put("x-dead-letter-exchange", exchangeName); 31 params.put("x-dead-letter-routing-key", routingKeyName); 32 return new Queue(delayQueueName, true, false, false, params); 33 } 34 35 /** 36 * 普通执行队列 37 * @return 38 */ 39 @Bean 40 public Queue processQueue() { 41 return new Queue(queueName, true); 42 } 43 44 /** 45 * 延时交换机 46 * @return 47 */ 48 @Bean 49 public DirectExchange delayEchange() { 50 return new DirectExchange(delayExchangeName, true, false); 51 } 52 /** 53 * 普通执行交换机 54 * @return 55 */ 56 @Bean 57 public DirectExchange processEchange() { 58 return new DirectExchange(exchangeName, true, false); 59 } 60 61 /** 62 * 绑定 将延时队列和延时交换机绑定, 并设置用于匹配键:delayRoutingKeyName 63 * @return 64 */ 65 @Bean 66 public Binding delayBinding() { 67 return BindingBuilder.bind(delayQueue()) 68 .to(delayEchange()) 69 .with(delayRoutingKeyName); 70 } 71 72 /** 73 * 普通队列和普通交换机绑定 并设置用于匹配键routingKeyName 74 * @return 75 */ 76 @Bean 77 public Binding processBinding() { 78 return BindingBuilder.bind(processQueue()) 79 .to(processEchange()) 80 .with(routingKeyName); 81 } 82 }
RabbitMqProviderImpl:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/17 19:52 4 * @Description 消息生产者 5 */ 6 @Slf4j 7 @Component 8 public class RabbitMqProviderImpl implements RabbitMqProvider { 9 10 @Value("${mq.delayExchangeName}") 11 private String delayExchangeName; 12 @Value("${mq.delayRoutingKeyName}") 13 private String delayRoutingKeyName; 14 15 @Autowired 16 private RabbitTemplate rabbitTemplate; 17 18 /** 19 * 发送唤醒消息 20 * @param reqDTO 21 */ 22 @Override 23 public void sendWakeUpMsg(SendWakeUpMsgReqDTO reqDTO) { 24 log.info("Enter RabbmitMqProviderImpl.sendWakeUpMsg reqDTO={}", reqDTO); 25 rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> { 26 // 设置过期时间为期望的延时时间,到期后进入死信队列,实现延时任务 27 message.getMessageProperties().setExpiration(reqDTO.getDelayTime()); 28 return message; 29 }); 30 } 31 }
RabbitMqConsumerImpl:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/17 22:17 4 * @Description 消息消费者 5 */ 6 @Slf4j 7 @Component 8 public class RabbitMqConsumerImpl implements RabbitMqConsumer { 9 10 @Autowired 11 private AsyncTaskService asyncTaskService; 12 13 /** 14 * 监听死信队列中的消息 15 * 延时任务队列中的消息到期后进入死信队列。此处监听死信队列消息,实现延时任务 16 * 每次监听到消息时,调起该方法。判断cnt是否为4,如果==4,唤醒返回;如果!=4,继续进入mq队列,等待下次唤醒 17 * @param reqDTO 18 */ 19 @RabbitListener(queues = "${mq.queueName}") 20 @Override 21 public void waitingForWakeUp(SendWakeUpMsgReqDTO reqDTO) { 22 log.info("Enter RabbitMqConsumerImpl.waitingForWakeUp reqDTO={}", reqDTO); 23 24 log.info("Consumer 正在睡眠:[{}]. 等待被唤醒", reqDTO.getCheckTimes()); 25 26 if (4 == reqDTO.getCheckTimes()) { 27 log.info("Consumer 已被唤醒."); 28 return; 29 } 30 31 if (reqDTO.getCheckTimes() < 6) { 32 // 再次进入mq队列 最多唤醒6次 33 sendMsg(reqDTO); 34 } 35 } 36 37 private void sendMsg(SendWakeUpMsgReqDTO reqDTO) { 38 // 进入mq队列次数 39 reqDTO.setCheckTimes(reqDTO.getCheckTimes() + 1); 40 // 设置延时 5秒 41 reqDTO.setDelayTime("5000"); 42 log.info("RabbitMqConsumerImpl.waitingForWakeUp.sendMsg 再次进入mq队列 reqDTO.getCheckTimes()={}", reqDTO.getCheckTimes()); 43 asyncTaskService.sendAsyncMqWakeup(reqDTO); 44 } 45 }
AsyncTaskServiceImpl:
1 /** 2 * @Author Nemo Wang 3 * @Date 2021/6/19 19:22 4 * @Description 异步调用 发送消息 5 */ 6 @Slf4j 7 @Component 8 public class AsyncTaskServiceImpl implements AsyncTaskService { 9 10 @Autowired 11 private RabbitMqProvider rabbitMqProvider; 12 13 @Async("asyncThreadPoolTaskExecutor") 14 @Override 15 public void sendAsyncMqWakeup(SendWakeUpMsgReqDTO reqDTO) { 16 log.info("AsyncTaskServiceImpl.sendAsyncMqWakeup reqDTO={}", reqDTO); 17 rabbitMqProvider.sendWakeUpMsg(reqDTO); 18 } 19 }