zoukankan      html  css  js  c++  java
  • RabbitMq的死信队列和延迟队列

    死信队列

    DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

    消息变成死信,可能是由于以下的原因:

    • 消息被拒绝
    • 消息过期
    • 队列达到最大长度

    DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

    要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

    定义交换机和队列

    /**
     * @author WGR
     * @create 2020/9/2 -- 16:26
     */
    @Configuration
    public class RabbitMQDLXConfig {
    
        @Bean("my_dlx_queue")
        public Queue myDlxQueue(){
            return QueueBuilder.durable("my_dlx_queue").build();
        }
    
        @Bean("my_dlx_exchange")
        public Exchange myDlxExchange(){
            return ExchangeBuilder.directExchange("my_dlx_exchange").durable(true).build();
        }
    
        //绑定队列和交换机
        @Bean
        public Binding myTtlDlx1(@Qualifier("my_dlx_queue") Queue queue,
                                           @Qualifier("my_dlx_exchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange)
                    .with("my_ttl_dlx").noargs();
        }
    
        //绑定队列和交换机
        @Bean
        public Binding myTtlDlx2(@Qualifier("my_dlx_queue") Queue queue,
                                 @Qualifier("my_dlx_exchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange)
                    .with("my_max_dlx").noargs();
        }
    
        //声明队列
        @Bean("my_ttl_dlx_queue")
        public Queue myTtlDlxQueue(){
            Map<String,Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange","my_dlx_exchange");
            arguments.put("x-dead-letter-routing-key","my_ttl_dlx");
            arguments.put("x-message-ttl",60000);
            return QueueBuilder.durable("my_ttl_dlx_queue").withArguments(arguments).build();
        }
    
        //声明队列
        @Bean("my_max_dlx_queue")
        public Queue myMaxDlxQueue(){
            Map<String,Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange","my_dlx_exchange");
            arguments.put("x-dead-letter-routing-key","my_max_dlx");
            arguments.put("x-max-length",2);
            return QueueBuilder.durable("my_max_dlx_queue").withArguments(arguments).build();
        }
    
        @Bean("my_normal_exchange")
        public Exchange myNormalExchange(){
            return ExchangeBuilder.directExchange("my_normal_exchange").durable(true).build();
        }
    
    
        //绑定队列和交换机
        @Bean
        public Binding myTtlDlx3(@Qualifier("my_ttl_dlx_queue") Queue queue,
                                 @Qualifier("my_normal_exchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange)
                    .with("my_ttl_dlx").noargs();
        }
    
        //绑定队列和交换机
        @Bean
        public Binding myTtlDlx4(@Qualifier("my_max_dlx_queue") Queue queue,
                                 @Qualifier("my_normal_exchange") Exchange exchange){
            return BindingBuilder.bind(queue).to(exchange)
                    .with("my_max_dlx").noargs();
        }
    
    
    }
    

    image-20200902170512538

    进行测试:

       /**
         * 过期消息投递到死信队列
         * 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机(队列)
         */
        @Test
        public void dlxTTLMessageTest(){
            rabbitTemplate.convertAndSend(
                    "my_normal_exchange",
                    "my_ttl_dlx",
                    "测试过期消息;6秒过期后会被投递到死信交换机2222");
        }
    
    
        /**
         * 消息长度超过2,会投递到死信队列中
         */
        @Test
        public void dlxMaxMessageTest(){
            rabbitTemplate.convertAndSend(
                    "my_normal_exchange",
                    "my_max_dlx",
                    "发送消息4:消息长度超过2,会被投递到死信队列中!");
    
            rabbitTemplate.convertAndSend(
                    "my_normal_exchange",
                    "my_max_dlx",
                    "发送消息5:消息长度超过2,会被投递到死信队列中!");
    
            rabbitTemplate.convertAndSend(
                    "my_normal_exchange",
                    "my_max_dlx",
                    "发送消息6:消息长度超过2,会被投递到死信队列中!");
    
        }
    

    image-20200902170227667

    3)流程

    具体因为队列消息过期而被投递到死信队列的流程:

    image-20200902170652969

    延迟队列

    延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

    在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;具体如下流程图所示:

    1565520000397

    在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消费者再从这些死信队列接收消息就可以实现消息的延迟接收。

    延迟队列的应用场景;如:

    • 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要进行支付失败的异常处理(将库存加回去),这时候可以通过使用延迟队列来处理
    • 在系统中如有需要在指定的某个时间之后执行的任务都可以通过延迟队列处理
  • 相关阅读:
    django模型层(二)多表操作
    django模型层(一)单表操作
    模板系统
    HTTP协议web开发知识点
    视图函数
    URL路由系统
    web框架的本质
    MySQL多表查询
    MySQL单标查询
    列表渲染
  • 原文地址:https://www.cnblogs.com/dalianpai/p/13602770.html
Copyright © 2011-2022 走看看