zoukankan      html  css  js  c++  java
  • RabbitMq死信队列

    死信队列的作用

    死信交换机有什么用呢? 在创建队列的时候 可以给这个队列附带一个交换机, 那么这个队列作废的消息就会被重新发到附带的交换机,然后让这个交换机重新路由这条消息。

    死信消息产生的来源

    • 消息被拒绝(basic.reject或basic.nack)并且requeue=false
    • 消息TTL过期
    • 队列达到最大长度(队列满了,无法再添加数据到mq中)

    死信队列处理的方式

    • 丢弃,如果不是很重要,可以选择丢弃
    • 记录死信入库,然后做后续的业务分析或处理
    • 通过死信队列,由负责监听死信的应用程序进行处理
      在这里插入图片描述

    消息超时进入死信队列

    通俗的说,就是消息产生之后,因为设置了超时时间,在这段时间内消息没有被消费就会被扔到死信队列里面。

     // 交换机名称
        private static final String DESTINATION_NAME = "rabbitMq_topic";
        //消息队列
        private static final String queueName = "topic_queue";
        //routingKey
        private static final String routingKey = "topic.#";
    
        //配置死信队列
        private static final String dlxExchangeName = "dlx.exchange";
        private static final String dlxQueueName = "dlx.queue";
        private static final String dlxRoutingKey = "#";
    
        @Test
        public void producer() throws IOException, TimeoutException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            Map<String, Object> arguments = new HashMap<String, Object>(16);
            // 为队列设置队列交换器
            arguments.put("x-dead-letter-exchange", dlxExchangeName);
            // 设置队列中的消息 60s 钟后过期
            arguments.put("x-message-ttl", 60000);
            //正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
            channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
            //消费声明队列
            channel.queueDeclare(queueName, true, false, false, arguments);
            //消费者队列绑定交换机 绑定路由件 路由键
            channel.queueBind(queueName, DESTINATION_NAME, routingKey);
    
            String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";
    
            // 创建死信交换器和队列
            channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
            channel.queueDeclare(dlxQueueName, true, false, false, null);
            channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
    
            //生产者发送消息者
            channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.err.println("消息发送完成......");
    
        }
    

    只监听了死信队列的消息,正常消息无需监听接收

        /**
         * 监听死信队列
         *
         * @throws IOException
         * @throws TimeoutException
         * @throws InterruptedException
         */
        @Test
        public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            System.out.println("死信消费者启动 ..........");
            Thread.sleep(65000);
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("死信队列接收到消息:" + new String(body));
                    System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(dlxQueueName, consumer);
        }
    

    消息被退回

    这个我在之前的整合SpringBoot的时候有实验过。

    channel.basicNack(envelope.getDeliveryTag(),false,false);
    

    队列达到最大长度

    这个和消息超时差不多,只不过是设置了队列的最大容量而已。
    只需要把上面的代码修改一下就可以了。

        @Test
        public void producer() throws IOException, TimeoutException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
    
            Map<String, Object> arguments = new HashMap<String, Object>(16);
            // 为队列设置队列交换器
            arguments.put("x-dead-letter-exchange", dlxExchangeName);
            //设置队列长度为3
            arguments.put("x-max-length", 3);
            //正常生产者绑定交换机 参数1 交换机名称 参数2 交换机类型
            channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
            //消费声明队列
            channel.queueDeclare(queueName, true, false, false, arguments);
            //消费者队列绑定交换机 绑定路由件 路由键
            channel.queueBind(queueName, DESTINATION_NAME, routingKey);
    
            String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 测试消息超时,传递到死信队列";
    
            // 创建死信交换器和队列
            channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
            channel.queueDeclare(dlxQueueName, true, false, false, null);
            channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
    
            //生产者发送消息者
            for (int i = 0; i < 5; i++) {
                channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
            }
            System.out.println("消息发送完成......");
        }
    

    在这里插入图片描述

     @Test
        public void consumer() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //此处设置一次只消费1个,且必须是ASK之后的消息才能算
            channel.basicQos(1);
            System.out.println("消费者启动 ..........");
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("正常队列:" + new String(body));
                    System.out.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(queueName, consumer);
        }
    
        /**
         * 监听死信队列
         *
         * @throws IOException
         * @throws TimeoutException
         * @throws InterruptedException
         */
        @Test
        public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
            //获取连接
            Connection connection = MQConnectionUtils.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            System.out.println("死信消费者启动 ..........");
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("死信队列接收到消息:" + new String(body));
                    System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(dlxQueueName, consumer);
        }
    

    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述

  • 相关阅读:
    ASP.NET服务器控件开发(4)复合控件
    C#特性对象集合初始化器
    C#特性匿名类型与隐式类型局部变量
    在Handler中使用Session
    使用 UDPClient 生成聊天客户端
    当下10大最热门的网站开发技术
    C#特性扩展方法
    50个非常有用的PHP工具
    c# 调用.bat文件
    c# 特性/属性(Attribute) 以及使用反射查看自定义特性
  • 原文地址:https://www.cnblogs.com/yangk1996/p/12674015.html
Copyright © 2011-2022 走看看