zoukankan      html  css  js  c++  java
  • Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

    本文主要摘录自:详细介绍Spring Boot + RabbitMQ实现延迟队列

    并增加了自己的一些理解,记录下来,以便日后查阅。

    项目源码:

    背景

    何为延迟队列?

    顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
    延迟队列能做什么?延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

    • 延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
    • 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

    如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

    实现思路

    在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

    Time-To-Live Extensions

    RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档

    Dead Letter Exchange

    刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

    • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
    • 消息因为设置了TTL而过期。
    • 消息进入了一条已经达到最大长度的队列。

    如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档

    流程图

    聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

    针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

    延迟消费

    延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

    延迟重试

    延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

    如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

    代码实现

    配置队列

    从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在RabbitMQ中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:

    • delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。
    • delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。
    • delay_process_queue:实际消费队列。

    我们通过Java Config的方式将上述的队列配置为Bean。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过期的消息都会通过DLX转发到delay_process_queue。

    delay_queue_per_message_ttl

    首先介绍delay_queue_per_message_ttl的配置代码:

    @Bean
    Queue delayQueuePerMessageTTL() {
        return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                           .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange
                           .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                           .build();
    }
    

    其中,x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。

    delay_queue_per_queue_ttl

    类似地,delay_queue_per_queue_ttl的配置代码:

    @Bean
    Queue delayQueuePerQueueTTL() {
        return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
                           .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
                           .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                           .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
                           .build();
    }
    

    delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl,该配置用来设置队列的过期时间。

    delay_process_queue

    delay_process_queue的配置最为简单:

    @Bean
    Queue delayProcessQueue() {
        return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
                           .build();
    }
    

    配置Exchange

    配置DLX

    首先,我们需要配置DLX,代码如下:

    @Bean
    DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }
    

    然后再将该DLX绑定到实际消费队列即delay_process_queue上。这样所有的死信都会通过DLX被转发到delay_process_queue:

    @Bean
    Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
        return BindingBuilder.bind(delayProcessQueue)
                             .to(delayExchange)
                             .with(DELAY_PROCESS_QUEUE_NAME);
    }
    

    配置延迟重试所需的Exchange

    从延迟重试的流程图中我们可以看到,消息处理失败之后,我们需要将消息转发到缓冲队列,所以缓冲队列也需要绑定一个Exchange。在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列。

    定义消费者

    我们创建一个最简单的消费者ProcessReceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:

    • 如果消息里的消息体不等于FAIL_MESSAGE,那么他会输出消息体。
    • 如果消息里的消息体恰好是FAIL_MESSAGE,那么他会模拟抛出异常,然后将该消息重定向到缓冲队列(对应延迟重试场景)。

    另外,我们还需要新建一个监听容器用于存放消费者,代码如下:

    @Bean
    SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 监听delay_process_queue
        container.setMessageListener(new MessageListenerAdapter(processReceiver));
        return container;
    }
    

    至此,我们前置的配置代码已经全部编写完成,接下来我们需要编写测试用例来测试我们的延迟队列。

    编写测试用例

    延迟消费场景

    首先我们编写用于测试TTL设置在消息上的测试代码。

    我们借助spring-rabbit包下提供的RabbitTemplate类来发送消息。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot会在初始化时自动地将RabbitTemplate当成bean加载到容器中。

    解决了消息的发送问题,那么又该如何为每个消息设置TTL呢?这里我们需要借助MessagePostProcessor。MessagePostProcessor通常用来设置消息的Header以及消息的属性。我们新建一个ExpirationMessagePostProcessor类来负责设置消息的TTL属性:

    /**
     * 设置消息的失效时间
     */
    public class ExpirationMessagePostProcessor implements MessagePostProcessor {
        private final Long ttl; // 毫秒
    
        public ExpirationMessagePostProcessor(Long ttl) {
            this.ttl = ttl;
        }
    
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties()
                   .setExpiration(ttl.toString()); // 设置per-message的失效时间
            return message;
        }
    }
    

    然后在调用RabbitTemplate的convertAndSend方法时,传入ExpirationMessagePostPorcessor即可。我们向缓冲队列中发送3条消息,过期时间依次为1秒,2秒和3秒。具体的代码如下所示:

    @Test
    public void testDelayQueuePerMessageTTL() throws InterruptedException {
        ProcessReceiver.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 1000;
            rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
                    (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
        }
        ProcessReceiver.latch.await();
    }
    

    细心的朋友一定会问,为什么要在代码中加一个CountDownLatch呢?这是因为如果没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,我们就看不到消息被延迟消费的表现了。

    那么类似地,测试TTL设置在队列上的代码如下:

    @Test
    public void testDelayQueuePerQueueTTL() throws InterruptedException {
        ProcessReceiver.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
                    "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
        }
        ProcessReceiver.latch.await();
    }
    

    我们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过期。

    延迟重试场景

    我们同样还需测试延迟重试场景。

    @Test
    public void testFailMessage() throws InterruptedException {
        ProcessReceiver.latch = new CountDownLatch(6);
        for (int i = 1; i <= 3; i++) {
            rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
        }
        ProcessReceiver.latch.await();
    }
    

    我们向delay_process_queue发送3条会触发FAIL的消息,理论上这3条消息会在4秒后自动重试。

    我的理解

    延迟消费过程(每个消息可以单独设置失效时间):

    • 1. 声明 delay_queue_per_message_ttl 队列:死信队列,设置 DLX 参数,包含 x-dead-letter-exchange 表示失效后进入的 exchange(值为 delay_exchange,即实际消费交换机)、x-dead-letter-routing-key 表示失效后的路由键(值为 delay_process_queue,即实际消费队列)。
    • 2. 声明 delay_process_queue 队列:实际消费队列。
    • 3. 声明 delay_exchange 交换机:实际消费交换机,类型为 Direct(一一对应)。
    • 4. 声明 dlx_binding 绑定:将实际消费队列和实际消费交换机绑定(路由键规则值为 delay_process_queue)。
    • 5. 发布一个消息,路由键为 delay_queue_per_message_ttl(发送到死信队列),并通过 header 单独设置每个消息的过期时间:当过期时间生效后,消息会转到实际消费队列。
    • 6. 声明一个消费者,监听 delay_process_queue 队列(即实际消费队列):消息正常被消费掉,达到延迟消费的目的。

    延迟消费过程(所有消息统一设置失效时间):

    • 1. 声明 delay_queue_per_queue_ttl 队列:死信队列,设置 DLX 参数,包含 x-dead-letter-exchange 表示失效后进入的 exchange(值为 delay_exchange,即实际消费交换机)、x-dead-letter-routing-key 表示失效后的路由键(值为 delay_process_queue,即实际消费队列)、x-message-ttl 表示队列消息过期时间。
    • 2. 声明 delay_process_queue 队列:实际消费队列。
    • 3. 声明 delay_exchange 交换机:实际消费交换机,类型为 Direct(一一对应)。
    • 4. 声明 dlx_binding 绑定:将实际消费队列和实际消费交换机绑定(路由键规则值为 delay_process_queue)。
    • 5. 发布一个消息,路由键为 delay_queue_per_queue_ttl(发送到死信队列):当过期时间生效后,消息会转到实际消费队列。
    • 6. 声明一个消费者,监听 delay_process_queue队列(即实际消费队列):消息正常被消费掉,达到延迟消费的目的。

    延迟重试过程

    • 1. 声明 delay_process_queue 队列:实际消费队列。
    • 2. 声明 delay_queue_per_queue_ttl 队列:死信队列,设置 DLX 参数,包含 x-dead-letter-exchange 表示失效后进入的 exchange(值为 delay_exchange,即实际消费交换机)、x-dead-letter-routing-key 表示失效后的路由键(值为 delay_process_queue,即实际消费队列)、x-message-ttl 表示队列消息过期时间。
    • 3. 声明 delay_exchange 交换机:实际消费交换机,类型为 Direct(一一对应)。
    • 4. 声明 per_queue_ttl_exchange 交换机:死信交换机,类型为 Direct(一一对应)。
    • 5. 声明 dlx_binding 绑定:将实际消费队列和实际消费交换机绑定(路由键规则值为 delay_process_queue)。
    • 6. 声明 queue_ttl_binding 绑定:将死信队列和死信交换机绑定(路由键规则值为 delay_queue_per_queue_ttl)。
    • 7. 发布一个消息,路由键为 delay_process_queue(发送到实际消费队列)。
    • 8. 声明一个消费者,监听 delay_process_queue 队列(即实际消费队列):消费者监听到消息,当处理过程中发生异常,消息重新发送到私信队列,然后等待过期时间生效后,消息再转到实际消费队列,重新消费,以达到延迟重试的目的。

    需要注意:在延迟消费的过程中,我们是没有创建死信交换机的,那为什么还可以发布消息呢?原因是 RabbitMQ 会使用默认的 Exchange,并且创建一个默认的 Binding(类型为 Direct),通过rabbitmqadmin list bindings命令,可以看到结果。

    Spring Cloud Stream RabbitMQ DLX 的实现:_rabbitmq_consumer_properties

  • 相关阅读:
    (BFS 二叉树) leetcode 515. Find Largest Value in Each Tree Row
    (二叉树 BFS) leetcode513. Find Bottom Left Tree Value
    (二叉树 BFS DFS) leetcode 104. Maximum Depth of Binary Tree
    (二叉树 BFS DFS) leetcode 111. Minimum Depth of Binary Tree
    (BFS) leetcode 690. Employee Importance
    (BFS/DFS) leetcode 200. Number of Islands
    (最长回文子串 线性DP) 51nod 1088 最长回文子串
    (链表 importance) leetcode 2. Add Two Numbers
    (链表 set) leetcode 817. Linked List Components
    (链表 双指针) leetcode 142. Linked List Cycle II
  • 原文地址:https://www.cnblogs.com/xishuai/p/spring-boot-rabbitmq-delay-queue.html
Copyright © 2011-2022 走看看