zoukankan      html  css  js  c++  java
  • RabbitMQ 死信队列

    一、死信队列

    死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列

    死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景

    二、死信的来源

    通常死信的来源有下面几种方式

    1、消息 TTL (Time To Live) 过期

    2、队列达到了最大长度,无法再添加消息到 MQ 中了

    3、消息被拒,并且没有重新入队(basic.reject || basic.Nack) && (requeue = false)

    三、消息 TTL 过期

    1、Consumer01

    public class Consumer01 {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        private static final String NORMAL_QUEUE = "normal_queue";
        private static final String NORMAL_ROUTING_KEY = "normal";
        private static final String DEAD_EXCHANGE = "dead_exchange";
        private static final String DEAD_ROUTING_KEY = "dead";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明正常消息的交换机(类型为 direct)
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中)
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
            channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
            // 正常消息交换机绑定正常消息队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);
    
            // 消息成功之后的回调
            DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                String msg = new String(message.getBody());
                System.out.println(msg);
            };
            // 取消消费者的回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("取消消费者时的回调接口");
            };
            // 消费者消费消息
            channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
    
            System.out.println("Consumer01 开始消费消息");
        }
    }
    

    2、Consumer02

    public class Consumer02 {
        private static final String DEAD_EXCHANGE = "dead_exchange";
        private static final String DEAD_QUEUE = "dead_queue";
        private static final String DEAD_ROUTING_KEY = "dead";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道对象
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明死信交换机(topic 类型)
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 声明死信队列
            channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
            // 死信交换机绑定死信队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
    
            // 消息成功之后的回调
            DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                String msg = new String(message.getBody());
                System.out.println(msg);
            };
            // 取消消费者的回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("取消消费者时的回调接口");
            };
            // 消费者消费消息
            channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
            System.out.println("Consumer02 开始消费消息");
        }
    }

    3、Producer

    public class Producer {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        private static final String NORMAL_ROUTING_KEY = "normal";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明一个 direct 类型的交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 消息发送 10 s 之后,如果没有消费者进行消费,那么该消息就称为死信,它就会进入死信队列中
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
    
            // 待发送的消息
            String message = "我是一只机智的小毛毛,很可爱,很机智";
            for (int i = 1; i < 11; i++) {
                channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, (message + i).getBytes(StandardCharsets.UTF_8));
            }
    
            System.out.println("Producer send message successfully...");
        }
    }
    

    4、测试过程及结果

    启动 Consumer01 将普通交换机、普通队列注册到 RabbitMQ 上,启动 Consumer02 将死信交换机、死信队列注册到 RabbitMQ 上

    然后为了演示消息超时之后可以进入死信队列,我们关闭 Consumer01,模拟其接收不到消息,为了不让死信消息被消费者消费掉,我们关闭 Consumer02,然后启动生产者 Producer

    10 s 之后普通队列里的消息进入死信队列中

    接着启动消费者 Consumer02 消费掉死信队列中的消息

     

    四、队列达到最大长度

    1、Consumer01

    public class Consumer01 {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        private static final String NORMAL_QUEUE = "normal_queue";
        private static final String NORMAL_ROUTING_KEY = "normal";
        private static final String DEAD_EXCHANGE = "dead_exchange";
        private static final String DEAD_ROUTING_KEY = "dead";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明正常消息的交换机(类型为 direct)
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中)
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
            // 设置正常队列的最大长度
            arguments.put("x-max-length",6);
            channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
            // 正常消息交换机绑定正常消息队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);
    
            // 消息成功之后的回调
            DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                String msg = new String(message.getBody());
                System.out.println(msg);
            };
            // 取消消费者的回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("取消消费者时的回调接口");
            };
            // 消费者消费消息
            channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
    
            System.out.println("Consumer01 开始消费消息");
        }
    }
    

    2、Consumer02 代码不变

    3、Producer 

    public class Producer {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        private static final String NORMAL_ROUTING_KEY = "normal";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明一个 direct 类型的交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 待发送的消息
            String message = "我是一只机智的小毛毛,很可爱,很机智";
            for (int i = 1; i < 11; i++) {
                channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
            }
    
            System.out.println("Producer send message successfully...");
        }
    }
    

    4、测试过程及结果

    删除掉原先的正常交换机、正常队列、死信交换机、死信队列,然后按照上面的方式启动 Consumer01、Consumer02 重新注册正常交换机、正常队列、死信交换机、死信队列,接着关闭 Consumer01、Consumer02,最后启动 Producer 发送消息(如果 Consumer01 是一直打开的情况下,正常队列的消息就不会堆积到 6 条)

    启动 Consumer01、Consumer02,发现 Consumer01 消费了 6 条消息,Consumer02 消费了四条消息

     

    五、消息被拒

    1、Consumer01

    public class Consumer01 {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        private static final String NORMAL_QUEUE = "normal_queue";
        private static final String NORMAL_ROUTING_KEY = "normal";
        private static final String DEAD_EXCHANGE = "dead_exchange";
        private static final String DEAD_ROUTING_KEY = "dead";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明正常消息的交换机(类型为 direct)
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 正常队列关联死信交换机(正常队列出现了故障之后,消息就会通过死信交换机传递到死信队列中)
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
            channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
            // 正常消息交换机绑定正常消息队列
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);
    
            // 消息成功之后的回调
            DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                String msg = new String(message.getBody());
                if (msg.contains("很机智4")) {
                    System.out.println("Consumer01 接收到消息" + msg + "并拒绝签收该消息");
                    //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                    channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
                } else {
                    System.out.println("Consumer01 接收到消息" + msg);
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            };
            // 取消消费者的回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("取消消费者时的回调接口");
            };
            // 消费者消费消息(一定要开启手动应答,如果你开启了自动应答,根本不存在拒绝消息的情况)
            channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    
            System.out.println("Consumer01 开始消费消息");
        }
    }
    

    2、Consumer02

    public class Consumer02 {
        private static final String DEAD_EXCHANGE = "dead_exchange";
        private static final String DEAD_QUEUE = "dead_queue";
        private static final String DEAD_ROUTING_KEY = "dead";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道对象
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明死信交换机(topic 类型)
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            // 声明死信队列
            channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
            // 死信交换机绑定死信队列
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
    
            // 消息成功之后的回调
            DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
                String msg = new String(message.getBody());
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                System.out.println(msg);
            };
            // 取消消费者的回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("取消消费者时的回调接口");
            };
            // 消费者消费消息
            channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
            System.out.println("Consumer02 开始消费消息");
        }
    }
    

    3、Producer

    public class Producer {
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        private static final String NORMAL_ROUTING_KEY = "normal";
    
        public static void main(String[] args) throws Exception {
            // 自定义工具类获取信道
            Channel channel = RabbitmqUtils.getChannel();
    
            // 声明一个 direct 类型的交换机
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            // 待发送的消息
            String message = "我是一只机智的小毛毛,很可爱,很机智";
            for (int i = 1; i < 11; i++) {
                channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
            }
    
            System.out.println("Producer send message successfully...");
        }
    }
    

    4、测试过程及结果

    删除掉原先的正常交换机、正常队列、死信交换机、死信队列,然后重新启动 Consumer01、Consumer02 注册正常交换机、正常队列、死信交换机、死信队列,接着关闭 Consumer02,启动 Producer 发送消息

    这里有几点需要注意一下

    1、因为只有被拒绝的消息才能进入死信队列中,所以 Consumer01 不能关闭,为了能看到死信队列里的消息,不让它被消费掉,所以需要关闭 Consumer02

    2、Consumer01 一定要开启手动确认,因为自动确认的场景下根本不存在消息被拒绝的情况

    打开死信队列查看被拒绝的消息

    启动 Consumer02 消费死信消息

     

  • 相关阅读:
    rails enum用于存储数据
    single-table inheritance 单表继承
    imageable.touch
    jbuilder的set!方法重构接口
    Two Strings Are Anagrams
    java项目导入IntelliJ IDEA
    mac 下载安装 IntelliJ IDEA Tomcat
    Merge k Sorted Lists Leetcode Java
    ruby on rails validates uniqueness
    使用update!导致的更新时候的错误信息不显示 ruby on rails
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/15546803.html
Copyright © 2011-2022 走看看