zoukankan      html  css  js  c++  java
  • RabbitMQ-TTL-死信队列_DLX

    1. 简介

    死信队列,简称:DLXDead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另外一个交换机,这个交换机就是DLX

    (一般会将DLX和与其binding 的 Queue,一并称为死信队列或DLX,习惯而已,不必纠结)

    那么什么情况下会成为Dead message

    1. 队列的长度达到阈值。
    2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
    3. 原队列存在消息过期设置,消息到达超时时间未被消费。

    流程讲解,如图所示(以第三种情况为例):

    1. Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
    2. 当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX
    3. DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
    4. 此时消费者监听此死信队列并消费此消息。

    死信队列有什么用呢?

    1. 取消订单(比如下单30分钟后未付款,则取消订单,回滚库存),或者新用户注册,隔段时间进行短信问候等。
    2. 将消费者拒绝的消息发送到死信队列,然后将消息进行持久化,后续可以做业务分析或者处理。

    2. TTL

    因为要实现延迟消息,我们先得知道如何设置过期时间。这里指演示

    TTLTime To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。

    RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

    • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

    • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,所以当所有消息的过期时间一致时(比如30m后过期),最好给队列设置过期时间,而不是消息。但是有的情况确实每个消息的过期时间不一致,比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知,这就有问题了,不可能设置那么多的队列,这时就需要使用延迟队列来实现这个功能(下篇博文会讲到)。

    • 如果两者都进行了设置,以时间短的为准。

    2.1 队列设置TTL

    2.1.1 引入所需依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
    

    2.1.2 application.yaml

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        # rabbit 默认的虚拟主机
        virtual-host: /
        # rabbit 用户名密码
        username: admin
        password: admin123
    

    2.1.3 RabbitConfig

    1. 声明一个过期时间为30s的Queue
    2. 声明一个交换机(这里声明的是主题交换机,交换机类型无所谓,只要消息能路由到Queue即可)。
    3. 设置绑定关系。
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 设置过期队列
     *
     * @author ludangxin
     * @date 2021/9/15
     */
    @Configuration
    public class RabbitTtlConfig {
       public static final String EXCHANGE_NAME = "TTL_EXCHANGE";
       public static final String QUEUE_NAME = "TTL_QUEUE";
    
       @Bean(QUEUE_NAME)
       public Queue queue() {
          return QueueBuilder.durable(QUEUE_NAME).ttl(30000).build();
       }
    
       @Bean(EXCHANGE_NAME)
       public Exchange exchange() {
          return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
       }
    
       @Bean
       public Binding binding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {
          return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();
       }
    }
    

    2.1.4 Producer

    import com.ldx.rabbitmq.config.RabbitTtlConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 具有过期时间的消息 生产者
     *
     * @author ludangxin
     * @date 2021/9/9
     */
    @Component
    public class TtlProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
       public void sendMsg() {
          rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", "这是一条有生命周期的消息。");
       }
    }
    

    2.1.5 测试代码

    @Autowired
    private TtlProducer ttlProducer;
    
    @Test
    public void sendMsg() {
        ttlProducer.sendMsg();
    }
    

    2.1.6 启动测试

    运行测试代码后,到RabbitMQ 控制台中查看队列即消息情况。

    如图所示,消息存活30s未被消费后,消息被遗弃。

    2.2 消息设置TTL

    2.2.1 Producer

    我们将Producer代码稍加修改,给消息设置10s的过期时间,观察消息到底是存活30s还是10s。

    import com.ldx.rabbitmq.config.RabbitTtlConfig;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 具有过期时间的消息 生产者
     *
     * @author ludangxin
     * @date 2021/9/9
     */
    @Component
    public class TtlProducer {
    
       @Autowired
       private RabbitTemplate rabbitTemplate;
    
       public void sendMsg() {
          MessageProperties mp = new MessageProperties();
          mp.setExpiration("10000");
          Message message = new Message("这是一条有生命周期的消息。".getBytes(), mp);
          rabbitTemplate.convertAndSend(RabbitTtlConfig.EXCHANGE_NAME, "ttl.user", message);
       }
    }
    

    2.2.2 启动测试

    如图所示,消息只存活了10s。

    我们将过期时间设置成40s后,但消息还是只存活了30s。说明当同时设置了过期时间时,是以时间短的为准

    3. TTL + DLX

    接下来我们通过设置过期时间和死信队列来实现延迟队列的功能。

    首先罗列下实现步骤:

    1. 声明一个ExchangeTTl Queue,并且绑定关系,实现生成死信的逻辑。
    2. 声明一个DLXQueue,此步骤的Queue是为了接收死信并让Consumer进行监听消费的。
    3. TTl QueueDLX进行绑定,使消息成为死信后能转发给DLX

    3.1 RabbitConfig

    其实DLX与普通的Exchange没有什么区别,只不过是“生产”死信的Queue指定了消息成为死信后转发到DLX。

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 死信队列配置
     *
     * @author ludangxin
     * @date 2021/9/15
     */
    @Configuration
    public class RabbitDeadLetterConfig {
    
       public static final String QUEUE_NAME_TTL = "QUEUE_NAME_TTL_1";
       public static final String EXCHANGE_NAME_TTL = "EXCHANGE_NAME_TTL_1";
       public static final String QUEUE_NAME_DEAD_LETTER = "QUEUE_NAME_DEAD_LETTER";
       public static final String EXCHANGE_NAME_DLX = "EXCHANGE_NAME_DLX";
       public static final String ROUTING_KEY_DLX = "EXPIRE.#";
       public static final String ROUTING_KEY_DEAD_LETTER = "EXPIRE.10";
       public static final String ROUTING_KEY_TTL = "EXPIRE_TTL_10";
    
       /**
        * 1. Queue 队列
        */
       @Bean(QUEUE_NAME_TTL)
       public Queue queue() {
          /*
           * 1. 设置队列的过期时间 30s
           * 2. 绑定DLX
           * 3. 设置routing key(注意:这里设置的是路由到死信Queue的路由,并不是设置binding关系的路由)
           */
          return QueueBuilder.durable(QUEUE_NAME_TTL).ttl(10000).deadLetterExchange(EXCHANGE_NAME_DLX).deadLetterRoutingKey(ROUTING_KEY_DEAD_LETTER).build();
       }
    
       /**
        * 2. exchange
        */
       @Bean(EXCHANGE_NAME_TTL)
       public Exchange exchange() {
          return ExchangeBuilder.directExchange(EXCHANGE_NAME_TTL).durable(true).build();
       }
    
       /**
        * 3. 队列和交互机绑定关系 Binding
        */
       @Bean
       public Binding bindExchange(@Qualifier(QUEUE_NAME_TTL) Queue queue, @Qualifier(EXCHANGE_NAME_TTL) Exchange exchange) {
          return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_TTL).noargs();
       }
    
       /**
        * 4. 死信队列
        */
       @Bean(QUEUE_NAME_DEAD_LETTER)
       public Queue deadLetterQueue() {
          return QueueBuilder.durable(QUEUE_NAME_DEAD_LETTER).build();
       }
    
       /**
        * 5. dlx
        */
       @Bean(EXCHANGE_NAME_DLX)
       public Exchange exchangeDlx() {
          return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build();
       }
    
       /**
        * 6. 队列和交互机绑定关系 Binding
        */
       @Bean
       public Binding bindDlxExchange(@Qualifier(QUEUE_NAME_DEAD_LETTER) Queue queue, @Qualifier(EXCHANGE_NAME_DLX) Exchange exchange) {
          return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DLX).noargs();
       }
    
    }
    

    3.2 Producer

    import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    /**
     * 延迟消息生产者
     *
     * @author ludangxin
     * @date 2021/9/9
     */
    @Component
    public class DelayProducer {
    
       @Autowired
       private RabbitTemplate rabbitTemplate;
    
       public void sendMsg() {
          String msg = "这是一条有生命周期的消息,发送时间为:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
          Message message = new Message(msg.getBytes());
          rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.EXCHANGE_NAME_TTL, RabbitDeadLetterConfig.ROUTING_KEY_TTL, message);
       }
    }
    

    3.3 Consumer

    import com.ldx.rabbitmq.config.RabbitDeadLetterConfig;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    /**
     * 延迟消息消费者
     *
     * @author ludangxin
     * @date 2021/9/9
     */
    @Slf4j
    @Component
    public class DelayConsumer {
    
        @RabbitListener(queues = {RabbitDeadLetterConfig.QUEUE_NAME_DEAD_LETTER})
        public void dlxQueue(Message message){
            log.info(new String(message.getBody()) + ",消息接收时间为:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        }
    }
    

    3.4 测试代码

    @Autowired
    private DelayProducer delayProducer;
    
    @Test
    @SneakyThrows
    public void sendDlxMsg() {
        delayProducer.sendMsg();
        // 使进程阻塞,方便Consumer监听输出Message
        System.in.read();
    }
    

    3.5 启动测试

    输出日志内容如下:

    2021-09-15 23:51:22.795  INFO 8122 --- [ntContainer#0-1] com.ldx.rabbitmq.consumer.DelayConsumer  : 这是一条有生命周期的消息,发送时间为:2021-09-15 23:51:12,消息接收时间为:2021-09-15 23:51:22
    
  • 相关阅读:
    if语句
    操作列表
    列表
    数据类型(不全)
    windows安装mysql
    hadr启动报错码
    db2主备hadr部署
    java--遍历字符个数
    java--装饰类
    java--继承&接口
  • 原文地址:https://www.cnblogs.com/ludangxin/p/15291518.html
Copyright © 2011-2022 走看看