zoukankan      html  css  js  c++  java
  • 八、RabbitMq死信队列与延迟队列

    1 死信队列

    1.1 死信的概念

    先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

    应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

    1.2 死信的来源

    1. 消息 TTL 过期,表示消息在队列中等待的最大时间。如果不设置TTL,表示消息永远不会过期
    2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
    3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

    1.3 死信实战

    代码架构图

    死信架构

    1.3.1 消息 TTL 过期

    生产者代码

    public class Producer {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] argv) throws Exception {
            try (Channel channel = RabbitMqUtils.getChannel()) {
                channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
                //设置消息的 TTL 时间
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
                //该信息是用作演示队列个数限制
                for (int i = 1; i < 11; i++) {
                    String message = "info" + i;
                    channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
                    System.out.println("生产者发送消息:" + message);
                }
            }
        }
    }
    

    消费者 C1 代码(启动之后关闭该消费者 模拟其接收不到消息)

    public class Consumer01 {
        //普通交换机名称
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        public static void main(String[] argv) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明死信和普通交换机 类型为 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //声明死信队列
            String deadQueue = "dead-queue";
            channel.queueDeclare(deadQueue, false, false, false, null);
            //死信队列绑定死信交换机与 routingkey
            channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
            //正常队列绑定死信队列信息
            Map<String, Object> params = new HashMap<>();
            //正常队列设置死信交换机 参数 key 是固定值
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //正常队列设置死信 routing-key 参数 key 是固定值
            params.put("x-dead-letter-routing-key", "lisi");
    
            String normalQueue = "normal-queue";
            channel.queueDeclare(normalQueue, false, false, false, params);
            channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
            System.out.println("等待接收消息.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer01 接收到消息" + message);
            };
            channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> {
            });
        }
    }
    

    死信队列效果0

    消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

    public class Consumer02 {
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        public static void main(String[] argv) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            String deadQueue = "dead-queue";
            channel.queueDeclare(deadQueue, false, false, false, null);
            channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
            System.out.println("等待接收死信队列消息.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer02 接收死信队列的消息" + message);
            };
            channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
            });
        }
    }
    

    死信队列效果1.png

    1.3.2 队列达到最大长度

    1. 消息生产者代码去掉 TTL 属性
    public class Producer {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] argv) throws Exception {
            try (Channel channel = RabbitMqUtils.getChannel()) {
                channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
                //该信息是用作演示队列个数限制
                for (int i = 1; i < 11; i++) {
                    String message = "info" + i;
                    channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
                    System.out.println("生产者发送消息:" + message);
                }
            }
        }
    }
    
    1. C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)

    params.put("x-max-length", 6);

    //正常队列绑定死信队列信息
    Map<String, Object> params = new HashMap<>();
    //正常队列设置死信交换机 参数 key 是固定值
    params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    //正常队列设置死信 routing-key 参数 key 是固定值
    params.put("x-dead-letter-routing-key", "lisi");
    // 设置正常队列长度的限制
    params.put("x-max-length", 6);
    

    注意此时需要把原先队列删除 因为参数改变了

    1. C2 消费者代码不变(启动 C2 消费者)

    死信队列效果2

    1.3.3 消息被拒

    1. 消息生产者代码同上生产者一致

    2. C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息)

    public class Consumer01 {
        //普通交换机名称
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
    
        public static void main(String[] argv) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明死信和普通交换机 类型为 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //声明死信队列
            String deadQueue = "dead-queue";
            channel.queueDeclare(deadQueue, false, false, false, null);
            //死信队列绑定死信交换机与 routingkey
            channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
            //正常队列绑定死信队列信息
            Map<String, Object> params = new HashMap<>();
            //正常队列设置死信交换机 参数 key 是固定值
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //正常队列设置死信 routing-key 参数 key 是固定值
            params.put("x-dead-letter-routing-key", "lisi");
            String normalQueue = "normal-queue";
            channel.queueDeclare(normalQueue, false, false, false, params);
            channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan");
            System.out.println("等待接收消息.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                if (message.equals("info5")) {
                    System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                    //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                    channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
                } else {
                    System.out.println("Consumer01 接收到消息" + message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            boolean autoAck = false;
            channel.basicConsume(normalQueue, autoAck, deliverCallback, consumerTag -> {
            });
        }
    }
    

    死信队列效果3.png

    1. C2 消费者代码不变

    启动消费者 1 然后再启动消费者 2

    死信队列效果4.png

    2 延迟队列

    2.1 延迟队列概念

    延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

    2.2 延迟队列使用场景

    1. 订单在十分钟之内未支付则自动取消

    2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

    3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。

    4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。

    5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

    这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下

    延迟队列流程

    2.3 RabbitMQ 中的 TTL

    TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

    2.3.1 消息设置 TTL

    这一种方式便是针对每条消息设置 TTL

    //设置消息的 TTL 时间
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    

    2.3.2 队列设置 TTL

    第一种是在创建队列的时候设置队列的“x-message-ttl”属性

    params.put("x-message-ttl", 10000);

    //正常队列绑定死信队列信息
    Map<String, Object> params = new HashMap<>();
    //正常队列设置死信交换机 参数 key 是固定值
    params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
    //正常队列设置死信 routing-key 参数 key 是固定值
    params.put("x-dead-letter-routing-key", "lisi");
    // 设置正常队列长度的限制
    params.put("x-max-length", 6);
    // 设置队列超时时间为10秒
    params.put("x-message-ttl", 10000);
    
    String normalQueue = "normal-queue";
    channel.queueDeclare(normalQueue, false, false, false, params);
    

    2.3.3 两者的区别

    如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

    3 总结

    延迟队列,其实就是应用了TTL+死信队列,来达到延迟队列小效果,想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

  • 相关阅读:
    Verilog非阻塞赋值的仿真/综合问题 (Nonblocking Assignments in Verilog Synthesis)上
    异步FIFO结构及FPGA设计 跨时钟域设计
    FPGA管脚分配需要考虑的因素
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 上篇)
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 下篇)
    中国通信简史 (下)
    谈谈德国大学的电子专业
    中国通信简史 (上)
    Verilog学习笔记
    Verilog非阻塞赋值的仿真/综合问题(Nonblocking Assignments in Verilog Synthesis) 下
  • 原文地址:https://www.cnblogs.com/linhp/p/15242736.html
Copyright © 2011-2022 走看看