zoukankan      html  css  js  c++  java
  • 使用RabbitMQ实现延时任务

    一、场景

    查询支付结果。由于支付系统的复杂性,客户支付后钱款可能无法实时到账。此时就需要延时任务轮询查询支付结果。

    类似此类无法直接实时获取结果的场景下,都可以使用延时任务完成结果状态的查询。

    二、方案

    普通交换器+死信交换器。根据延时需求设置消息过期时间,消息过期进入死信队列,消费者监听死信队列,实现延时任务。

    三、概念

    要理解普通交换器+私信交换器实现延时任务的原理,首先要了解RabbitMQ的消息存活时间TTL、死信交换机的概念。

    1. 消息存活时间Time to Live(TTL)

    RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连接的保留时间,也可以对每一个单独的消息做单独的设置。如果队列和消息都设置了TTL,那么会取较小的时间。

    当队列中的消息存留时间超过了配置的生存时间(TTL),则称该消息已死亡。注意,同一个消息被路由到不同的队列将拥有不同的过期时间,又或者永远不会过期。这取决于消息所存在的队列。一个队列中的死亡消息不会影响到其他队列中与之相同消息的生命周期。

    所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

    可以通过消息的expiration过期时间字段设置消息的TTL

    1 rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> {
    2             // 设置过期时间为期望的延时时间,到期后进入死信队列,实现延时任务
    3             message.getMessageProperties().setExpiration("10000");
    4             return message;
    5         });

    上面这个消息设置了过期时间为10000ms,所以10秒后如果没有被消费,则该消息已死亡,即“死信”。单靠死信还无法完成延时任务,还需要死信交换机Dead Letter Exchanges.

    2. 死信交换机Dead Letter Exchanges

    队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:

    • 消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
    • 消息由于消息有效期(per-message TTL)过期
    • 消息由于队列超过其长度限制而被丢弃

    注意,队列的有效期并不会导致其中的消息过期。

    死信交换机(DLXs)就是普通的交换机,可以是任何一种类型,也可以用普通常用的方式进行声明。

    给队列设置死信交换机时,可以在声明队列时使用可选参数"x-dead-letter-exchange"进行声明配置。该参数值必须是与队列在同一个虚拟主机的交换机名称。注意,并不要求在声明队列时死信交换机必须已经被声明,但是当消息需要死信路由时,该交换机必须存在,否则,消息将会被丢弃。

    也可以指定一个路由关键字在死信路由时使用,如果没有设置,那么就会使用消息自身原来的路由关键字。

     1 /**
     2      * 延时队列
     3      * @return
     4      */
     5     @Bean
     6     public Queue delayQueue() {
     7         Map<String, Object> params = new HashMap<>();
     8         params.put("x-dead-letter-exchange", exchangeName);
     9         params.put("x-dead-letter-routing-key", routingKeyName);
    10         return new Queue(delayQueueName, true, false, false, params);
    11     }

    上述代码通过两个可选参数给队列delayQueueName设置了死信交换机和死信路由键,也就是说,当队列delayQueueName发生死信时,将会根据配置的死信交换机和死信路由键把死信转发到目标队列中。消费者监听这个死信队列,即可实现延时任务。

    四、代码实现

    1. 这段示例代码实现的是延时唤醒操作。生产者发送唤醒消息,消费者监听死信队列中的唤醒消息,如果唤醒次数小于6,就再次发送唤醒消息,等待下次唤醒。当唤醒次数为4时,消费者被唤醒,不再发送消息。

    2. 代码结构:

    • SendWakeUpMsgReqDTO:发送消息的参数
    • RabbitMqConfig:MQ配置
    • RabbitMqProviderImpl:消息生产者
    • RabbitMqConsumerImpl:消息消费者
    • AsyncTaskServiceImpl:异步调用 发送消息

    SendWakeUpMsgReqDTO:

     1 import lombok.Data;
     2 
     3 import java.io.Serializable;
     4 
     5 /**
     6  * @Author Nemo Wang
     7  * @Date 2021/6/17 19:46
     8  * @Description 发送消息的参数
     9  */
    10 @Data
    11 public class SendWakeUpMsgReqDTO implements Serializable {
    12     private static final long serialVersionUID = 6298050708365621926L;
    13 
    14     /**
    15      * 来源
    16      */
    17     private String sourceName;
    18 
    19     /**
    20      * 发送次数
    21      */
    22     private int checkTimes;
    23 
    24     /**
    25      * 延时时间
    26      */
    27     private String delayTime;
    28 
    29 }
    发送消息的参数

    RabbitMqConfig:

     1 /**
     2  * @Author Nemo Wang
     3  * @Date 2021/6/17 17:40
     4  * @Description RabbitMQ配置类
     5  */
     6 @Configuration
     7 public class RabbitMqConfig {
     8 
     9     @Value("${mq.delayQueueName}")
    10     private String delayQueueName;
    11     @Value("${mq.delayExchangeName}")
    12     private String delayExchangeName;
    13     @Value("${mq.delayRoutingKeyName}")
    14     private String delayRoutingKeyName;
    15 
    16     @Value("${mq.queueName}")
    17     private String queueName;
    18     @Value("${mq.exchangeName}")
    19     private String exchangeName;
    20     @Value("${mq.routingKeyName}")
    21     private String routingKeyName;
    22 
    23     /**
    24      * 延时队列
    25      * @return
    26      */
    27     @Bean
    28     public Queue delayQueue() {
    29         Map<String, Object> params = new HashMap<>();
    30         params.put("x-dead-letter-exchange", exchangeName);
    31         params.put("x-dead-letter-routing-key", routingKeyName);
    32         return new Queue(delayQueueName, true, false, false, params);
    33     }
    34 
    35     /**
    36      * 普通执行队列
    37      * @return
    38      */
    39     @Bean
    40     public Queue processQueue() {
    41         return new Queue(queueName, true);
    42     }
    43 
    44     /**
    45      * 延时交换机
    46      * @return
    47      */
    48     @Bean
    49     public DirectExchange delayEchange() {
    50         return new DirectExchange(delayExchangeName, true, false);
    51     }
    52     /**
    53      * 普通执行交换机
    54      * @return
    55      */
    56     @Bean
    57     public DirectExchange processEchange() {
    58         return new DirectExchange(exchangeName, true, false);
    59     }
    60 
    61     /**
    62      * 绑定 将延时队列和延时交换机绑定, 并设置用于匹配键:delayRoutingKeyName
    63      * @return
    64      */
    65     @Bean
    66     public Binding delayBinding() {
    67         return BindingBuilder.bind(delayQueue())
    68                 .to(delayEchange())
    69                 .with(delayRoutingKeyName);
    70     }
    71 
    72     /**
    73      * 普通队列和普通交换机绑定 并设置用于匹配键routingKeyName
    74      * @return
    75      */
    76     @Bean
    77     public Binding processBinding() {
    78         return BindingBuilder.bind(processQueue())
    79                 .to(processEchange())
    80                 .with(routingKeyName);
    81     }
    82 }
    RabbitMQ配置类

    RabbitMqProviderImpl:

     1 /**
     2  * @Author Nemo Wang
     3  * @Date 2021/6/17 19:52
     4  * @Description 消息生产者
     5  */
     6 @Slf4j
     7 @Component
     8 public class RabbitMqProviderImpl implements RabbitMqProvider {
     9 
    10     @Value("${mq.delayExchangeName}")
    11     private String delayExchangeName;
    12     @Value("${mq.delayRoutingKeyName}")
    13     private String delayRoutingKeyName;
    14 
    15     @Autowired
    16     private RabbitTemplate rabbitTemplate;
    17 
    18     /**
    19      * 发送唤醒消息
    20      * @param reqDTO
    21      */
    22     @Override
    23     public void sendWakeUpMsg(SendWakeUpMsgReqDTO reqDTO) {
    24         log.info("Enter RabbmitMqProviderImpl.sendWakeUpMsg reqDTO={}", reqDTO);
    25         rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> {
    26             // 设置过期时间为期望的延时时间,到期后进入死信队列,实现延时任务
    27             message.getMessageProperties().setExpiration(reqDTO.getDelayTime());
    28             return message;
    29         });
    30     }
    31 }
    消息生产者

    RabbitMqConsumerImpl:

     1 /**
     2  * @Author Nemo Wang
     3  * @Date 2021/6/17 22:17
     4  * @Description 消息消费者
     5  */
     6 @Slf4j
     7 @Component
     8 public class RabbitMqConsumerImpl implements RabbitMqConsumer {
     9 
    10     @Autowired
    11     private AsyncTaskService asyncTaskService;
    12 
    13     /**
    14      * 监听死信队列中的消息
    15      * 延时任务队列中的消息到期后进入死信队列。此处监听死信队列消息,实现延时任务
    16      * 每次监听到消息时,调起该方法。判断cnt是否为4,如果==4,唤醒返回;如果!=4,继续进入mq队列,等待下次唤醒
    17      * @param reqDTO
    18      */
    19     @RabbitListener(queues = "${mq.queueName}")
    20     @Override
    21     public void waitingForWakeUp(SendWakeUpMsgReqDTO reqDTO) {
    22         log.info("Enter RabbitMqConsumerImpl.waitingForWakeUp reqDTO={}", reqDTO);
    23 
    24         log.info("Consumer 正在睡眠:[{}]. 等待被唤醒", reqDTO.getCheckTimes());
    25 
    26         if (4 == reqDTO.getCheckTimes()) {
    27             log.info("Consumer 已被唤醒.");
    28             return;
    29         }
    30 
    31         if (reqDTO.getCheckTimes() < 6) {
    32             // 再次进入mq队列 最多唤醒6次
    33             sendMsg(reqDTO);
    34         }
    35     }
    36 
    37     private void sendMsg(SendWakeUpMsgReqDTO reqDTO) {
    38         // 进入mq队列次数
    39         reqDTO.setCheckTimes(reqDTO.getCheckTimes() + 1);
    40         // 设置延时 5秒
    41         reqDTO.setDelayTime("5000");
    42         log.info("RabbitMqConsumerImpl.waitingForWakeUp.sendMsg 再次进入mq队列 reqDTO.getCheckTimes()={}", reqDTO.getCheckTimes());
    43         asyncTaskService.sendAsyncMqWakeup(reqDTO);
    44     }
    45 }
    消息消费者

    AsyncTaskServiceImpl:

     1 /**
     2  * @Author Nemo Wang
     3  * @Date 2021/6/19 19:22
     4  * @Description 异步调用 发送消息
     5  */
     6 @Slf4j
     7 @Component
     8 public class AsyncTaskServiceImpl implements AsyncTaskService {
     9 
    10     @Autowired
    11     private RabbitMqProvider rabbitMqProvider;
    12 
    13     @Async("asyncThreadPoolTaskExecutor")
    14     @Override
    15     public void sendAsyncMqWakeup(SendWakeUpMsgReqDTO reqDTO) {
    16         log.info("AsyncTaskServiceImpl.sendAsyncMqWakeup reqDTO={}", reqDTO);
    17         rabbitMqProvider.sendWakeUpMsg(reqDTO);
    18     }
    19 }
    异步调用 发送消息
  • 相关阅读:
    创建HttpFilter与理解多个Filter代码的执行顺序
    Filter
    JSTL
    EL
    JavaBean
    HttpSession之表单的重复提交 & 验证码
    相对路径和绝对路径
    HttpSession之简易购物车
    HttpSession
    Cookie
  • 原文地址:https://www.cnblogs.com/nemowang1996/p/14923529.html
Copyright © 2011-2022 走看看