zoukankan      html  css  js  c++  java
  • 要做重试机制,就只能选择 DelayQueue ?其实 RabbitMQ 它上它也行!

    原文链接:要做重试机制,就只能选择 DelayQueue ?其实 RabbitMQ 它上它也行!

    一、场景

    最近研发一个新功能,后台天气预警:后台启动一条线程,定时调用天气预警 API,查询现有城市的相关天气预警信息,如果发现有预警或取消预警的信息,给指定预警部门配置的相关人员发送短信;而如果第一次发送失败,我们需要隔几分钟再重新发送,最多可以重发5次。

    二、技术选型

    1、JDK 原生 DelayQueue:

    重试机制最简单的就是直接利用 JDK 提供的 DelayQyeye,而 DelayQueue 里面存放的任务需要是实现 Delay 接口的实现类,需要重写 getDelay 方法和 compareTo 方法。getDelay 方法主要用做判断任务是否到期要出队列,而 compareTo 方法主要用做入队时任务的判断,过期时间短的任务应放在队列的前面,通过这个方法,我们可以知道,DelayQueue 的底层是利用 PriorityQueue 实现的。

    下面上一个 DelayQueue 的简单的使用例子:

    /**
     * @author Howinfun
     * @desc
     * @date 2020/8/1
     */
    public class TestDelayQueue {
    
        public static void main(String[] args) throws InterruptedException {
            DelayQueue<UserMsg> delayQueue = new DelayQueue();
            UserMsg userMsg1 = new UserMsg(1,"15627272727","你好,下单成功1",5, TimeUnit.SECONDS);
            UserMsg userMsg2 = new UserMsg(2,"15627272727","你好,下单成功2",3, TimeUnit.SECONDS);
            UserMsg userMsg3 = new UserMsg(3,"15627272727","你好,下单成功3",4, TimeUnit.SECONDS);
            UserMsg userMsg4 = new UserMsg(4,"15627272727","你好,下单成功4",6, TimeUnit.SECONDS);
            UserMsg userMsg5 = new UserMsg(5,"15627272727","你好,下单成功5",2, TimeUnit.SECONDS);
            delayQueue.add(userMsg1);
            delayQueue.put(userMsg2);
            delayQueue.put(userMsg3);
            delayQueue.put(userMsg4);
            delayQueue.put(userMsg5);
    
            for (int i=0;i<5;i++){
                // take方法会一直阻塞,直到有任务
                UserMsg userMsg = delayQueue.take();
                System.out.println(userMsg.toString());
            }
        }
    }
    
    @Data
    @ToString
    public class UserMsg implements Delayed {
    
        private int id;
        private String phone;
        private String msg;
        private int failCount;
        // 过期时间
        private long time;
    
        public UserMsg(int id,String phone,String msg,long time,TimeUnit unit){
            this.id = id;
            this.phone = phone;
            this.msg = msg;
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
            this.failCount = 0;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            // 和当前时间比较,判断是否到期
            return this.time - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            // 入队时需要判断任务放到队列的哪个位置,过期时间短的放在前面
            UserMsg item = (UserMsg) o;
            long diff = this.time - item.time;
            if (diff <= 0) {
                return -1;
            }else {
                return 1;
            }
        }
    }
    

    扩展点:PriorityQueue的优化点

    讲到 PriorityQueue,有一个优化点我是觉得挺有意思的:它的 take() 方法,是会一直阻塞直到有任务过期出队列。它里面主要是利用 for 死循环去读取队列的头节点,判断头节点是否为空,如果为空,则直接调用 Condition#await() 进入阻塞状态;而如果队列的头节点不为空,但是任务还未过期,则会判断之前是否有线程(leader)尝试获取过期任务了,如果有的话就调用 Condition#await() 方法,否则就继续在死循环里面继续尝试获取过期任务。这样的话,避免所有尝试获取过期任务的线程一直在死循环,这样能让多余的线程进入阻塞状态,从而释放系统资源。当然了,只要 leader 拿到过期任务了,那么就会判断队列是否还有任务,如果有则调用 Condition#signal() 唤醒等待状态的线程们。我们可以看看源码:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    

    2、消息中间件 RabbitMQ :

    可能很多同学看到这个标题后会有点疑惑,消息中间件还能做重发机制?其实一开始我都没想到,关于 RabbitMQ 我也只是简单地学了他的六大使用模式,其他的业务场景还没深究。然后有一天和我的一个好同事聊了一下这个重试机制,他就说了 RabbitMQ 的死信队列可以做到,然后我自己就去研究一下。

    他的主要原理是利用消息的 ttl + 死信队列。当短信发送失败时,封装一个消息往指定的业务 queue 发送,并且指定消息的 ttl,当然了,还需要为业务 queue 指定死信队列。当消息过期后,会从业务 queue 转到死信队列中,所以说,我们只需要监听死信队列,拉取其中的消息进行消费,这样就能做到重试了。

    这两个怎么选?

    使用原生的 DelayQueue 会更加方便点,因为只需要自定义类实现 Delay 接口,启动线程阻塞获取过期任务即可。可是对于想监控这个 DelayQueue 里面任务的情况,可能就要自己写接口来获取了,并且在微服务中,通常一个服务模块有多个实例,这样子的话,统一管理还有监控就更麻烦了。

    所以我们可以考虑使用 RabbitMQ 来完成。不但可以统一处理重试机制,并且 RabbitMQ 还提供了自己的后台管理系统,这样监控起来也很方便。

    三、RabbitMQ 如何利用消息 ttl 和死信队列做重试机制

    下面是基于 spring-boot-starter-amqp 做的。

    1、声明业务/死信队列相关组件: Exchange、Routing Key、Queue

    第一步还是比较简单的,主要是创建对应的交换器、队列和路由键,特别要注意的是,在创建业务队列时,需要为他设置死信队列的相关信息,代码如下:

    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 业务 queue
         */
        public static final String BUSINESS_QUEUE_NAME = "business.queue";
        /**
         * 业务 exchange
         */
        public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
        /**
         * 业务 routing key
         */
        public static final String BUSINESS_QUEUE_ROUTING_KEY = "business.routing.key";
        /**
         * 死信队列 exchange
         */
        public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
        /**
         * 死信队列 queue
         */
        public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
        /**
         * 死信队列 routing key
         */
        public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.routing.key";
    
        /**
         * 声明业务交换器
         * @return
         */
        @Bean("businessExchange")
        public DirectExchange businessExchange(){
            return new DirectExchange(BUSINESS_EXCHANGE_NAME);
        }
    
        /**
         * 声明业务队列
         * @return
         */
        @Bean("businessQueue")
        public Queue businessQueue(){
            Map<String, Object> args = new HashMap<>(3);
            // 这里声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
            // 这里声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY);
            return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();
        }
    
        /**
         * 声明业务队列绑定业务交换器,绑定路由键
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                       @Qualifier("businessExchange") DirectExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_QUEUE_ROUTING_KEY);
        }
    
        /**
         * 声明死信交换器
         * @return
         */
        @Bean("deadLetterExchange")
        public DirectExchange deadLetterExchange(){
            return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
        }
    
        /**
         * 声明死信队列
         */
        @Bean("deadLetterQueue")
        public Queue deadLetterQueue(){
            return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).build();
        }
    
        /**
         * 声明死信队列绑定死信交换器,绑定路由键
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                       @Qualifier("deadLetterExchange") DirectExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY);
        }
    }
    

    2、模拟业务处理失败,发送需要重试的短信:

    代码如下:

    UserMsg userMsg1 = new UserMsg(1,"15627236666","你好,麻烦充值",1);
    UserMsg userMsg2 = new UserMsg(2,"15627236667","你好,麻烦支付",1);
    UserMsg userMsg3 = new UserMsg(3,"15627236668","你好,麻烦下单",1);
    String msgJson1 = JSON.toJSONString(userMsg1);
    String msgJson2 = JSON.toJSONString(userMsg2);
    String msgJson3 = JSON.toJSONString(userMsg3);
    
    userMsgMapper.insert(userMsg1);
    userMsgMapper.insert(userMsg2);
    userMsgMapper.insert(userMsg3);
    
    MessagePostProcessor messagePostProcessor = message -> {
        // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
        message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
        return message;
    };
    // 往业务 Queue 发送需要重试的短信
    rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,RabbitMQConfig.BUSINESS_QUEUE_ROUTING_KEY,msgJson1,messagePostProcessor);
    rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,RabbitMQConfig.BUSINESS_QUEUE_ROUTING_KEY,msgJson2,messagePostProcessor);
    rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,RabbitMQConfig.BUSINESS_QUEUE_ROUTING_KEY,msgJson3,messagePostProcessor);
    

    我们可以看到,在发送消息时,会利用 MessagePostProcessor 来完成给消息添加 ttl。

    3、监听死信队列,消费消息:

    这是最重要的一步,我们需要监听死信队列,一旦有消息,证明有任务需要重试了,我们只需要拉取下来然后消费即可。

    这里需要注意的有一个点:为了避免出现消息丢失的情况,我们需要开启手动 ack,然后配合 fetch = 1,保证客户端每次只能拉取一个消息,当客户端消费完此消息后,需要手动调用 channel#basicAck() 方法去确认此消息已经被消费了。

    下面是相关的配置:

    # 开启手动 ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    # 设置 false,消息才能进入死信队列
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
    # 消费者每次只读取一个消息
    spring.rabbitmq.listener.simple.prefetch=1
    

    接着我们看看如何监听死信队列,先上代码:

    //@RabbitListener(queues = {RabbitMQConfig.DEAD_LETTER_QUEUE_NAME})
    @Component
    public class DeadLetterQueueListener {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
        @Resource
        private UserMsgMapper userMsgMapper;
    
        @RabbitListener(queues = {RabbitMQConfig.DEAD_LETTER_QUEUE_NAME})
        @RabbitHandler
        public void processHandler(String msg, Channel channel, Message message) throws IOException {
    
            try {
                UserMsg userMsg = JSON.parseObject(new String(message.getBody()), UserMsg.class);
                // 模拟发送短信
                int num = new Random().nextInt(10);
                if (num >5){
                    // 发送成功
                    // 更新数据库记录
                    System.out.println("消息【" + userMsg.getId() + "】发送成功,失败次数:" + userMsg.getFailCount());
                    userMsgMapper.update(userMsg);
                }else {
                    // 重新发到业务队列中
                    int failCount = userMsg.getFailCount()+1;
                    if (failCount > 5){
                        System.out.println("消息【"+ userMsg.getId() +"】发送次数已到上线");
                        userMsgMapper.update(userMsg);
                    }else {
                        userMsg.setFailCount(failCount);
                        String msgJson = JSON.toJSONString(userMsg);
                        System.out.println("消息【"+ userMsg.getId() +"】发送失败,失败次数为:"+ userMsg.getFailCount());
                        userMsgMapper.update(userMsg);
                        MessagePostProcessor messagePostProcessor = message2 -> {
                            // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
                            message2.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
                            return message2;
                        };
                        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME,RabbitMQConfig.BUSINESS_QUEUE_ROUTING_KEY,msgJson,messagePostProcessor);
                    }
                }
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (Exception e) {
                System.err.println("消息即将再次返回队列处理...");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
    

    其实非常简单,首先在 @RabbitListener 注解中加上自己需要监听的死信队列,我们可以发现这个注解可加载类上,也可以加载处理消息的方法上;当然了,还需要在消费消息的方法上加上注解 @RabbitHandler。

    在消费消息的逻辑中,如果是业务处理成功了,也就是重试成功了,此时不需做其他操作;而如果重试失败了,需要重新发送一个消息到业务 Queue,表示又要重试一次。最后,我们需要调用 channel#basicAck() 表示消息消费成功~

    在给业务 Queue 发送消息之前,我们记得给消息设置一下过期时间,还是利用 MessagePostProcessor 来完成。

    四、最后

    到此基本就结束了。

    但是我们还有一个点要注意:就是如果我们使用 RabbitMQ 来做重试机制,我们一定要保证 RabbitMQ 的高可用,这时候我们一般推荐使用镜像集群模式,而不是普通集群模式。因为普通集群模式中,每个实例都只是保存其他实例中 queue 的元数据,只要一个实例宕机的,它所负责的 queue 都不能再被使用了。而镜像集群模式中,每个实例都会保存所有 queue ,这样能保证数据 100% 的不丢失!当然了,如果不追求高并发,使用主备模式也还是可以滴~

    大家如果对上面的例子还感兴趣,可到我的 github 看看:死信队列完成重试机制

  • 相关阅读:
    【MFC初学】
    【递归】【3月周赛1】【Problem B】
    不管ACM是不是屠龙之技
    【贪心+背包】【HDU2546】【饭卡】
    【精度问题】【HDU2899】Strange fuction
    【泛化物品】【HDU1712】【ACboy needs your help】
    【递推】【HDU2585】【hotel】
    【二进制拆分多重背包】【HDU1059】【Dividing】
    【水:最长公共子序列】【HDU1159】【Common Subsequence】
    【递推+矩阵快速幂】【HDU2604】【Queuing】
  • 原文地址:https://www.cnblogs.com/Howinfun/p/13418086.html
Copyright © 2011-2022 走看看