zoukankan      html  css  js  c++  java
  • rabbitmq的死信队列(四)

    死信队列


    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

     

    消息成为死信的三种情况:

      1. 队列消息长度到达限制;

      2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

      3. 原队列存在消息过期设置,消息到达超时时间未被消费;

    队列绑定死信交换机:

    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key和x-message-ttl和x-max-length

          x-dead-letter-exchange:绑定的死信交换机名称

       x-dead-letter-routing-key:绑定正常队列和死信交换机的路由

          x-dead-letter-routing-key:ttl过期时间

       x-max-length:设置正常队列长度限制

     rabbitmq-high-producer项目

     application.properties文件

    server.port=8081
    # ip
    spring.rabbitmq.host=127.0.0.1
    #默认5672
    spring.rabbitmq.port=5672
    #用户名
    spring.rabbitmq.username=guest
    #密码
    spring.rabbitmq.password=guest
    #连接到代理时用的虚拟主机
    spring.rabbitmq.virtual-host=/
    #是否启用【发布确认】,默认false
    #spring.rabbitmq.publisher-confirm-type=correlated替换spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-confirm-type=correlated
    #是否启用【发布返回】,默认false
    spring.rabbitmq.publisher-returns=true
    #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #rabbitmq限流,必须在ack确认才能使用
    #消费者最小数量
    spring.rabbitmq.listener.simple.concurrency=1
    #最大的消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
    spring.rabbitmq.listener.simple.prefetch=2
            
    DlxQueueRabbitConfig类
    package com.qingfeng.rabbitmqhighproducer.dlx.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 死信队列
     */
    @Configuration
    public class DlxQueueRabbitConfig {
    
        //正常队列名称
        public static final String NORMAL_DLX_QUEUE = "normal_dlx_queue";
        //正常交换机名称
        public static final String NORMAL_DLX_Exchange = "normal_dlx_exchange";
    
        //ttl过期时间毫秒
        private static final int NORMAL_DLX_EXPIRATION = 10000;
    
        //设置正常队列长度限制
        private static final int NORMAL_DLX_LENGTH = 10;
    
        //死信队列名称
        public static final String DLX_QUEUE = "dlx_queue";
        //死信交换机名称
        public static final String DLX_Exchange = "dlx_exchange";
    
    
        //声明正常交换机
        @Bean("normalDlxExchange")
        public TopicExchange normalDlxExchange(){
            return new TopicExchange(NORMAL_DLX_Exchange);
        }
    
        //声明正常队列绑定死信队列的交换机
        @Bean("normalDlxQueue")
        public Queue normalDlxQueue(){
            return QueueBuilder.durable(NORMAL_DLX_QUEUE)
                    .withArgument("x-dead-letter-exchange", DLX_Exchange)
                    .withArgument("x-dead-letter-routing-key", "dlx.wq")
                    .withArgument("x-message-ttl", NORMAL_DLX_EXPIRATION)
                    .withArgument("x-max-length",NORMAL_DLX_LENGTH)
                    .build();
        }
    
        //声明正常队列和正常交换机的绑定
        @Bean
        public Binding normalDlxBinding(){
            return BindingBuilder.bind(normalDlxQueue()).to(normalDlxExchange()).with("test.dlx.#");
        }
    
    //=========================================================================
    
        //声明死信队列
        @Bean
        public Queue dlxQueue(){
            return new Queue(DLX_QUEUE);
        }
        //声明死信交换机
        @Bean
        public TopicExchange dlxExchange(){
            return new TopicExchange(DLX_Exchange);
        }
        //声明死信队列和死信交换机的绑定
        @Bean
        public Binding dlxBinding(){
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.#");
        }
    
    
    }
    DlxController类
    package com.qingfeng.rabbitmqhighproducer.dlx;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    @RestController
    @RequestMapping("dlx")
    public class DlxController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //http://127.0.0.1:8081/dlx/testTimeDLX
        //测试时间过期
        @GetMapping("/testTimeDLX")
        public String testTimeDLX() {
            String messageId = String.valueOf(UUID.randomUUID());
            //normal_dlx_exchange正常交换机  test.dlx.wq:正常交换机与正常绑定的队列的路由
            rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"变成死信队列消息");
            return "ok";
        }
    
    
    }

    启动rabbitmq-high-producer项目

    1.测试原队列存在消息过期设置,消息到达超时时间未被消费

    http://127.0.0.1:8081/dlx/testTimeDLX

     我们在设置的ttl过期时间10000毫秒过后,也就是10秒后,正常队列的消息会转到死信队列里面去

     2.测试队列消息长度到达限制

    DlxController类
    package com.qingfeng.rabbitmqhighproducer.dlx;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    @RestController
    @RequestMapping("dlx")
    public class DlxController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        //http://127.0.0.1:8081/dlx/veroLengthDLX
        //2.测试队列超出长度
        @GetMapping("/veroLengthDLX")
        public String veroLengthDLX() {
            for (int i=0;i<20;i++){
                String messageId = String.valueOf(UUID.randomUUID());
                rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"变成死信队列消息");
            }
            return "ok";
        }
    
    
    
    }

    启动rabbitmq-high-producer项目

    访问:http://127.0.0.1:8081/dlx/veroLengthDLX
    设置正常队列长度限制为10,我们生产者发送了20个消息,正常队列只能保存10个

      我们在设置的ttl过期时间10000毫秒过后,也就是10秒后,正常队列的消息会全部转到死信队列里面去

    3消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

    在rabbitmq-high-producer项目的DlxController类添加

    package com.qingfeng.rabbitmqhighproducer.dlx;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.UUID;
    
    @RestController
    @RequestMapping("dlx")
    public class DlxController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //3.测试消息被消费者拒收

    //http://127.0.0.1:8081/dlx/rejectionDLX
        @GetMapping("/rejectionDLX")
        public String rejectionDLX() {
            String messageId = String.valueOf(UUID.randomUUID());
            rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"变成死信队列消息");
            return "ok";
        }
    
    }

    在rabbitmq-high-consumer项目
    DxlListener类  开启int i = 1/0;//出现错误
    package com.qingfeng.rabbitmqhighconsumer.dxl;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * Consumer ACK机制:
     *  1. 设置手动签收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
     *  2. 让监听器类实现ChannelAwareMessageListener接口
     *  3. 如果消息成功处理,则调用channel的 basicAck()签收
     *  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
     */
    
    @Component
    public class DxlListener {
    
        //手动签收
        @RabbitHandler
        @RabbitListener(queues = "normal_dlx_queue")
        public void onMessage(Message message, Channel channel) throws Exception {
            //Thread.sleep(1000);
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                //1.接收转换消息
                System.out.println("接受到的消息为"+new String(message.getBody()));
    
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 1/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                /**
                 * 4.有异常就拒绝签收
                 * basicNack(long deliveryTag, boolean multiple, boolean requeue)
                 * 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费
                 * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
                 * alse:将消息丢弃
                 */
                System.out.println("有异常就拒绝签收");
                //拒绝签收,不重回队列,requeue为false,这样才能到死信队列里面去
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }

    启动rabbitmq-high-producer和rabbitmq-high-consumer项目


    测试:http://127.0.0.1:8081/dlx/rejectionDLX

    在rabbitmq-high-consumer项目consumer拒绝接收消息,直接转到死信队列去了

    小结:

      1. 死信交换机和死信队列和普通的没有区别

      2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

      3. 消息成为死信的三种情况:
        1. 队列消息长度到达限制;
        2. 消费者拒接消费消息,并且不重回队列;
        3. 原队列存在消息过期设置,消息到达超时时间未被消费;




  • 相关阅读:
    Oracle分页之一:最普通的分页方式
    MSSQL存储过程学习笔记一:关于存储过程
    MSSQL自动备份数据库
    小试JQuery的AutoComplete插件
    利用面向对象的方式来使用JS
    Oracle分页之三:利用PagerView来实现无刷新GridView
    由于 ASP.NET 进程标识对全局程序集缓存没有读权限,因此未能执行请求。错误: 0x80131902
    终端服务器超出最大允许连接数
    从苹果的Siri说起:云搜索与人工智能
    [转]为什么我们程序员难晋升
  • 原文地址:https://www.cnblogs.com/Amywangqing/p/14696143.html
Copyright © 2011-2022 走看看