zoukankan      html  css  js  c++  java
  • springboot rabbitmq 死信队列应用场景和完整demo

    何为死信队列?

    死信队列实际上就是,当我们的业务队列处理失败(比如抛异常并且达到了retry的上限),就会将消息重新投递到另一个Exchange(Dead Letter Exchanges),该Exchange再根据routingKey重定向另一个队列,在这个队列重新处理该消息。

    来自一个队列的消息可以被当做‘死信’,即被重新发布到另外一个“exchange”去,这样的情况有:
    • 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false不重新入队参数或达到的retry重新入队的上限次数
    • 消息的TTL(Time To Live)-存活时间已经过期
    • 队列长度限制被超越(队列满,queue的"x-max-length"参数
     
    Dead letter exchanges (DLXs) are normal exchanges.
     
    For any given queue, a DLX can be defined by clients using the queue's arguments, or in the server using policies.
     
    经过上面的认知,可知应用场景:重要的业务队列如果失败,就需要重新将消息用另一种业务逻辑处理;如果是正常的业务逻辑故意让消息中不合法的值失败,就不需要死信;具体场景具体分析

    SpringBoot配置文件

    设置重试次数、间隔和投递到死信队列

    spring.application.name=spring-boot-rabbitmq
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=nut
    spring.rabbitmq.password=nut
    
    # 允许消息消费失败的重试
    spring.rabbitmq.listener.simple.retry.enabled=true
    # 消息最多消费次数3次
    spring.rabbitmq.listener.simple.retry.max-attempts=3
    # 消息多次消费的间隔1秒
    spring.rabbitmq.listener.simple.retry.initial-interval=1000
    #  设置为false,会丢弃消息或者重新发布到死信队列
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
    
    
    server.port=5678

    初始化和绑定重定向队列配置类

    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 死信队列的配置
     */
    @Configuration
    public class RabbitDeadLetterConfig {
    
        public static final String DEAD_LETTER_EXCHANGE = "TDL_EXCHANGE";
        public static final String DEAD_LETTER_TEST_ROUTING_KEY = "TDL_KEY";
        public static final String DEAD_LETTER_REDIRECT_ROUTING_KEY = "TKEY_R";
        public static final String DEAD_LETTER_QUEUE = "TDL_QUEUE";
        public static final String REDIRECT_QUEUE = "TREDIRECT_QUEUE";
    
        /**
         * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
         */
        @Bean("deadLetterExchange")
        public Exchange deadLetterExchange() {
            return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE).durable(true).build();
        }
    
        @Bean("deadLetterQueue")
        public Queue deadLetterQueue() {
            Map<String, Object> args = new HashMap<>(2);
    //       x-dead-letter-exchange    声明  死信队列Exchange
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    //       x-dead-letter-routing-key    声明 死信队列抛出异常重定向队列的routingKey(TKEY_R)
            args.put("x-dead-letter-routing-key", DEAD_LETTER_REDIRECT_ROUTING_KEY);
            return QueueBuilder.durable(DEAD_LETTER_QUEUE).withArguments(args).build();
        }
    
        @Bean("redirectQueue")
        public Queue redirectQueue() {
            return QueueBuilder.durable(REDIRECT_QUEUE).build();
        }
    
        /**
         * 死信队列绑定到死信交换器上.
         *
         * @return the binding
         */
        @Bean
        public Binding deadLetterBinding() {
            return new Binding(DEAD_LETTER_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_TEST_ROUTING_KEY, null);
    
        }
    
        /**
         * 将重定向队列通过routingKey(TKEY_R)绑定到死信队列的Exchange上
         *
         * @return the binding
         */
        @Bean
        public Binding redirectBinding() {
            return new Binding(REDIRECT_QUEUE, Binding.DestinationType.QUEUE, DEAD_LETTER_EXCHANGE, DEAD_LETTER_REDIRECT_ROUTING_KEY, null);
        }
    }

    生产者向业务队列发送消息

    这里为了方便测试没有往业务队列发送消息,直接往死信Exchange里投递消息。

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import xy.study.rabbitmq.conf.RabbitDeadLetterConfig;
    
    @Slf4j
    @Component
    public class DeadLetterSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(int number) {
            log.warn("DeadLetterSender : {}", number);
            // 这里的Exchange可以是业务的Exchange,为了方便测试这里直接往死信Exchange里投递消息
            rabbitTemplate.convertAndSend(
                    RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE,
                    RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY,
                    number);
        }
    }

    死信队列消费者

    这里会抛异常

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    import xy.study.rabbitmq.conf.RabbitDeadLetterConfig;
    
    @Slf4j
    @Component
    @RabbitListener(queues = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE)
    public class DeadLetterConsumer {
    
        /*@RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = RabbitDeadLetterConfig.DEAD_LETTER_QUEUE, durable = "true"),
                exchange = @Exchange(value = RabbitDeadLetterConfig.DEAD_LETTER_EXCHANGE, type = ExchangeTypes.DIRECT),
                key = RabbitDeadLetterConfig.DEAD_LETTER_TEST_ROUTING_KEY)
        )*/
        @RabbitHandler
        public void testDeadLetterQueueAndThrowsException(@Payload Integer number){
            log.warn("DeadLetterConsumer :{}/0 ", number);
            int i = number / 0;
        }
    }

    重定向队列

    队列"死信"后,会将消息投递到Dead Letter Exchanges,然后该Exchange会将消息投递到重定向队列

    此时,在重定向队列中,做对应的业务操作。

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import xy.study.rabbitmq.conf.RabbitDeadLetterConfig;
    
    @RabbitListener(queues = RabbitDeadLetterConfig.REDIRECT_QUEUE)
    @Component
    @Slf4j
    public class RedirectQueueConsumer {
    
        /**
         * 重定向队列和死信队列形参一致Integer number
         * @param number
         */
        @RabbitHandler
        public void fromDeadLetter(Integer number){
            log.warn("RedirectQueueConsumer : {}", number);
            // 对应的操作
            int i = number / 1;
        }
    }

    测试

    先启动项目

    然后利用测试类发送一条信息

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import xxx.DeadLetterSender;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqApplicationTests {
    
    
        @Autowired
        private DeadLetterSender deadLetterSender;
    
    
        @Test
        public void testSendDeadLetterQueue(){
            deadLetterSender.send(15);
        }
    
    }

    再看RabbitmqApplication控制台日志

    重试3次后,消息不再入队,投递到DL Exchange,路由到重定向队列。

    SpringBoot RabbitMQ 延迟队列代码实现 

    参考:

    http://www.cnblogs.com/wei-feng/p/6599419.html

    https://my.oschina.net/10000000000/blog/1626278

  • 相关阅读:
    【STM32F429】第6章 ThreadX操作系统移植(IAR)
    【STM32F429】第5章 ThreadX操作系统移植(MDK AC6)
    【硬核】超强八位半开源万用表
    【STM32H7】第4章 ThreadX操作系统移植(MDK AC5)
    【STM32F429】第4章 ThreadX操作系统移植(MDK AC5)
    【STM32H7】第3章 ThreadX操作系统介绍
    【STM32F429】第3章 ThreadX操作系统介绍
    ST发布M33内核新品STM32U5,首款40nm工艺超低功耗系列,160MHz全速运行19uA/MHz
    CAN总线35周年特别篇 -- CAN总线的前世今生
    【STM32H7】第2章 初学ThreadX准备工作
  • 原文地址:https://www.cnblogs.com/theRhyme/p/10874409.html
Copyright © 2011-2022 走看看