zoukankan      html  css  js  c++  java
  • RabbitMq死信队列

    死信队列的作用

    死信交换机有什么用呢? 在创建队列的时候 可以给这个队列附带一个交换机, 那么这个队列作废的消息就会被重新发到附带的交换机,然后让这个交换机重新路由这条消息。

    死信消息产生的来源

    • 消息被拒绝(basic.reject或basic.nack)并且requeue=false
    • 消息TTL过期
    • 队列达到最大长度(队列满了,无法再添加数据到mq中)

    死信队列处理的方式

    • 丢弃,如果不是很重要,可以选择丢弃
    • 记录死信入库,然后做后续的业务分析或处理
    • 通过死信队列,由负责监听死信的应用程序进行处理
      在这里插入图片描述

    消息超时进入死信队列

    通俗的说,就是消息产生之后,因为设置了超时时间,在这段时间内消息没有被消费就会被扔到死信队列里面。

     // 交换机名称
        private static final String DESTINATION_NAME = "rabbitMq_topic";
        //消息队列
        private static final String queueName = "topic_queue";
        //routingKey
        private static final String routingKey = "topic.#";
    
        //配置死信队列
        private static final String dlxExchangeName = "dlx.exchange";
        private static final String dlxQueueName = "dlx.queue";
        private static final String dlxRoutingKey = "#";
    
        @Test
        public void producer() throws IOException, TimeoutException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            Map<String, Object> arguments = new HashMap<String, Object>(16);
            // 为队列设置队列交换器
            arguments.put("x-dead-letter-exchange", dlxExchangeName);
            // 设置队列中的消息 60s 钟后过期
            arguments.put("x-message-ttl", 60000);
            //正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
            channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
            //消费声明队列
            channel.queueDeclare(queueName, true, false, false, arguments);
            //消费者队列绑定交换机 绑定路由件 路由键
            channel.queueBind(queueName, DESTINATION_NAME, routingKey);
    
            String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";
    
            // 创建死信交换器和队列
            channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
            channel.queueDeclare(dlxQueueName, true, false, false, null);
            channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
    
            //生产者发送消息者
            channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.err.println("消息发送完成......");
    
        }
    

    只监听了死信队列的消息,正常消息无需监听接收

        /**
         * 监听死信队列
         *
         * @throws IOException
         * @throws TimeoutException
         * @throws InterruptedException
         */
        @Test
        public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            System.out.println("死信消费者启动 ..........");
            Thread.sleep(65000);
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("死信队列接收到消息:" + new String(body));
                    System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(dlxQueueName, consumer);
        }
    

    消息被退回

    这个我在之前的整合SpringBoot的时候有实验过。

    channel.basicNack(envelope.getDeliveryTag(),false,false);
    

    队列达到最大长度

    这个和消息超时差不多,只不过是设置了队列的最大容量而已。
    只需要把上面的代码修改一下就可以了。

        @Test
        public void producer() throws IOException, TimeoutException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
    
            Map<String, Object> arguments = new HashMap<String, Object>(16);
            // 为队列设置队列交换器
            arguments.put("x-dead-letter-exchange", dlxExchangeName);
            //设置队列长度为3
            arguments.put("x-max-length", 3);
            //正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
            channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
            //消费声明队列
            channel.queueDeclare(queueName, true, false, false, arguments);
            //消费者队列绑定交换机 绑定路由件 路由键
            channel.queueBind(queueName, DESTINATION_NAME, routingKey);
    
            String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";
    
            // 创建死信交换器和队列
            channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
            channel.queueDeclare(dlxQueueName, true, false, false, null);
            channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
    
            //生产者发送消息者
            for (int i = 0; i < 5; i++) {
                channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
            }
            System.out.println("消息发送完成......");
        }
    

    在这里插入图片描述

     @Test
        public void consumer() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //此处设置一次只消费1个,且必须是ASK之后的消息才能算
            channel.basicQos(1);
            System.out.println("消费者启动 ..........");
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("正常队列:" + new String(body));
                    System.out.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(queueName, consumer);
        }
    
        /**
         * 监听死信队列
         *
         * @throws IOException
         * @throws TimeoutException
         * @throws InterruptedException
         */
        @Test
        public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            System.out.println("死信消费者启动 ..........");
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("死信队列接收到消息:" + new String(body));
                    System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(dlxQueueName, consumer);
        }
    

    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述

  • 相关阅读:
    leetcode108 Convert Sorted Array to Binary Search Tree
    leetcode98 Validate Binary Search Tree
    leetcode103 Binary Tree Zigzag Level Order Traversal
    leetcode116 Populating Next Right Pointers in Each Node
    Python全栈之路Day15
    Python全栈之路Day11
    集群监控
    Python全栈之路Day10
    自动部署反向代理、web、nfs
    5.Scss的插值
  • 原文地址:https://www.cnblogs.com/yangk1996/p/12674015.html
Copyright © 2011-2022 走看看