zoukankan      html  css  js  c++  java
  • Rabbitmq的死信队列和延时队列

    一、死信队列

    死信队列其实和普通的队列没啥大的区别,都需要创建自己的QueueExchange,然后通过RoutingKey绑定到Exchange上去,只不过死信队列的RoutingKeyExchange要作为参数,绑定到正常的队列上去,一种应用场景是正常队列里面的消息被basicNack或者reject时,消息就会被路由到正常队列绑定的死信队列中,还有一种还有常用的场景就是开启了自动签收,然后消费者消费消息时出现异常,超过了重试次数,那么这条消息也会进入死信队列,如果配置了话,当然还有其他的应用场景,这里不一一讨论。

    1.1、死信队列和交换器配置

    这里有两个队列,正常的业务队列emailQueue和与之绑定的死信队列,这里只演示,手动签收,消费者捕获异常Nack

    1.1.2、yml配置

    spring:
     rabbitmq:
        host: 192.168.99.12
        port: 5672
        username: guest
        password: guest
        # 发送确认
        publisher-confirms: true
        # 路由失败回调
        publisher-returns: true
        template:
            # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
            mandatory: true
        listener:
          simple:
            # 每次从RabbitMQ获取的消息数量
            prefetch: 1
            default-requeue-rejected: false
            # 每个队列启动的消费者数量
            concurrency: 1
            # 每个队列最大的消费者数量
            max-concurrency: 1
            # 签收模式为手动签收-那么需要在代码中手动ACK
            acknowledge-mode: manual
    #邮件队列
    email:
      queue:
        name: demo.email
    	
    #邮件交换器名称
    exchange:
      name: demoTopicExchange
    
    #死信队列
    dead:
      letter:
        queue:
          name: demo.dead.letter
        exchange:
          name: demoDeadLetterTopicExchange
    

    1.1.3、死信队列配置

    /**
     * rabbitmq 配置
     *
     * @author DUCHONG
     * @since 2020-08-23 14:05
     **/
    @Configuration
    @Slf4j
    public class RabbitmqConfig {
    
    
        @Value("${email.queue.name}")
        private String emailQueue;
        @Value("${exchange.name}")
        private String topicExchange;
        @Value("${dead.letter.queue.name}")
        private String deadLetterQueue;
        @Value("${dead.letter.exchange.name}")
        private String deadLetterExchange;
    
        @Bean
        public Queue emailQueue() {
    
            Map<String, Object> arguments = new HashMap<>(2);
            // 绑定死信交换机
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            // 绑定死信的路由key
            arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");
    
            return new Queue(emailQueue,true,false,false,arguments);
        }
    
    	
        @Bean
        TopicExchange emailExchange() {
            return new TopicExchange(topicExchange);
        }
    
    
        @Bean
        Binding bindingEmailQueue() {
            return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
        }
    
        
        //私信队列和交换器
        @Bean
        public Queue deadLetterQueue() {
            return new Queue(deadLetterQueue);
        }
    
        @Bean
        TopicExchange deadLetterExchange() {
            return new TopicExchange(deadLetterExchange);
        }
    
        @Bean
        Binding bindingDeadLetterQueue() {
            return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
        }
    
    }
    

    1.2、消息发送方

    @Configuration
    @EnableScheduling
    @Slf4j
    public class ScheduleController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Value("${exchange.name}")
        private String topicExchange;
    
        @Scheduled(cron = "0 0/2 * * * ?")
        public void sendEmailMessage() {
    
            String msg = RandomStringUtils.randomAlphanumeric(8);
            JSONObject email=new JSONObject();
            email.put("content",msg);
            email.put("to","duchong@qq.com");
            CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
            log.info("---发送 email 消息---{}---messageId---{}",email,correlationData.getId());
        }
    
    
    }
    

    1.3、消息消费方

    @Component
    @Slf4j
    public class MessageHandler {
    
        
       /**
         * 邮件消费者
         * @param message
         * @param channel
         * @param headers
         * @throws IOException
         */
        @RabbitListener(queues ="demo.email")
        @RabbitHandler
        public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
    
            try {
    
                String msg=new String(message.getBody(), CharEncoding.UTF_8);
                JSONObject jsonObject = JSON.parseObject(msg);
                jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
                log.info("---接受到消息---{}",jsonObject);
    			//主动异常
    			int m=1/0;
                //手动签收
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
            catch (Exception e) {
                log.info("handleEmailMessage捕获到异常,拒绝重新入队---消息ID---{}",headers.get("spring_returned_message_correlation"));
                //异常,ture 重新入队,或者false,进入死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    
            }
        }
    
        /**
         * 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack
         * @param message
         */
        @RabbitListener(queues = "demo.dead.letter")
        public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
    
            //可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理
            log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
    		//回复ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    
     
    }
    
    

    1.4、结果

    image-20200823185649078

    二、延时队列

    延时队列顾名思义,不是及时的队列,也就是发送者发给的消息要延时一段时间,消费者才能接受的到,这里有个典型的应用场景就是订单30分钟内未支付就关闭订单,当然死信队列也是可以实现的,这里只演示消息的延时消费逻辑,订单逻辑就一个判断,这里不做讨论。

    2.1、延时队列和交换器配置

    使用延时队列之前,需要先安装延时队列插件,安装方法,前面已经介绍过了,这里放个链接

    延时队列插件安装

    2.1.1、yml配置

    spring:
        rabbitmq:
            host: 192.168.99.12
            port: 5672
            username: guest
            password: guest
            # 发送确认
            publisher-confirms: true
            # 路由失败回调
            publisher-returns: true
            template:
                # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
                mandatory: true
            #消费端
            listener:
                simple:
                    # 每次从RabbitMQ获取的消息数量
                    prefetch: 1
                    default-requeue-rejected: false
                    # 每个队列启动的消费者数量
                    concurrency: 1
                    # 每个队列最大的消费者数量
                    max-concurrency: 1
                    # 签收模式为手动签收-那么需要在代码中手动ACK
                    acknowledge-mode: manual
    #邮件队列
    email:
        queue:
            name: demo.email
    
    #邮件交换器名称
    exchange:
        name: demoTopicExchange
    
    #死信队列
    dead:
        letter:
            queue:
                name: demo.dead.letter
            exchange:
                name: demoDeadLetterTopicExchange
    
    #延时队列
    delay:
        queue:
            name: demo.delay
        exchange:
            name: demoDelayTopicExchange
    

    2.1.2、延时队列配置

    /**
     * rabbitmq 配置
     *
     * @author DUCHONG
     * @since 2020-08-23 14:05
     **/
    @Configuration
    @Slf4j
    public class RabbitmqConfig {
    
    
        @Value("${email.queue.name}")
        private String emailQueue;
        @Value("${exchange.name}")
        private String topicExchange;
        @Value("${dead.letter.queue.name}")
        private String deadLetterQueue;
        @Value("${dead.letter.exchange.name}")
        private String deadLetterExchange;
        @Value("${delay.queue.name}")
        private String delayQueue;
        @Value("${delay.exchange.name}")
        private String delayExchange;
    
        @Bean
        public Queue emailQueue() {
    
            Map<String, Object> arguments = new HashMap<>(2);
            // 绑定死信交换机
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            // 绑定死信的路由key
            arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");
    
            return new Queue(emailQueue,true,false,false,arguments);
        }
    
    
        @Bean
        TopicExchange emailExchange() {
            return new TopicExchange(topicExchange);
        }
    
    
        @Bean
        Binding bindingEmailQueue() {
            return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
        }
    
    
        //私信队列和交换器
        @Bean
        public Queue deadLetterQueue() {
            return new Queue(deadLetterQueue);
        }
    
        @Bean
        TopicExchange deadLetterExchange() {
            return new TopicExchange(deadLetterExchange);
        }
    
        @Bean
        Binding bindingDeadLetterQueue() {
            return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
        }
        //延时队列
        @Bean
        public Queue delayQueue() {
            return new Queue(delayQueue);
        }
    
        @Bean
        CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "topic");
            //参数二为类型:必须是x-delayed-message
            return new CustomExchange(delayExchange, "x-delayed-message", true, false, args);
    
        }
    
        @Bean
        Binding bindingDelayQueue() {
            return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();
        }
    }
    

    2.2、消息发送方

    30分钟时间太久了,这里延时2分钟来看效果

    @Configuration
    @EnableScheduling
    @Slf4j
    public class ScheduleController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Value("${exchange.name}")
        private String topicExchange;
    
        @Value("${delay.exchange.name}")
        private String delayTopicExchange;
    
        @Scheduled(cron = "0 0/1 * * * ?")
        public void sendEmailMessage() {
    
            String msg = RandomStringUtils.randomAlphanumeric(8);
            JSONObject email=new JSONObject();
            email.put("content",msg);
            email.put("to","duchong@qq.com");
            CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
            log.info("---发送 email 消息---{}---messageId---{}",email,correlationData.getId());
        }
    
    
        @Scheduled(cron = "0 0/1 * * * ?")
        public void sendDelayOrderMessage() throws Exception{
    
            //订单号 id实际是保存订单后返回的,这里用uuid代替
            String orderId = UUID.randomUUID().toString();
            // 模拟订单信息
            JSONObject order=new JSONObject();
            order.put("orderId",orderId);
            order.put("goodsName","vip充值");
            order.put("orderAmount","99.00");
            CorrelationData correlationData=new CorrelationData(orderId);
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setMessageId(orderId);
            //30分钟时间太长,这里延时120s消费
            messageProperties.setHeader("x-delay", 120000);
            Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);
    
            rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);
    
            log.info("---发送 order 消息---{}---orderId---{}",order,correlationData.getId());
            //睡一会,为了看延迟效果
            TimeUnit.MINUTES.sleep(10);
        }
    }
    

    2.3、消息消费方

    @Component
    @Slf4j
    public class MessageHandler {
    
    
        /**
         * 邮件发送
         * @param message
         * @param channel
         * @param headers
         * @throws IOException
         */
        @RabbitListener(queues ="demo.email")
        @RabbitHandler
        public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
    
            try {
    
                String msg=new String(message.getBody(), CharEncoding.UTF_8);
                JSONObject jsonObject = JSON.parseObject(msg);
                jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
                log.info("---接受到消息---{}",jsonObject);
    			//主动异常
    			int m=1/0;
                //手动签收
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
            catch (Exception e) {
                log.info("handleEmailMessage捕获到异常,拒绝重新入队---消息ID---{}", headers.get("spring_returned_message_correlation"));
                //异常,ture 重新入队,或者false,进入死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    
            }
        }
    
        /**
         * 死信消费者,自动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack
         * @param message
         */
        @RabbitListener(queues = "demo.dead.letter")
        public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
    
            //可以考虑数据库记录,每次进来查数量,达到一定的数量,进行预警,人工介入处理
            log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
    		//回复ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    
        /**
         * 延时队列消费
         * @param message
         * @param channel
         * @param headers
         * @throws IOException
         */
        @RabbitListener(queues ="demo.delay")
        @RabbitHandler
        public void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
    
            try {
    
                String msg=new String(message.getBody(), CharEncoding.UTF_8);
                JSONObject jsonObject = JSON.parseObject(msg);
                log.info("---接受到订单消息---orderId---{}",message.getMessageProperties().getMessageId());
                log.info("---订单信息---order---{}",jsonObject);
                //业务逻辑,根据订单id获取订单信息,如果还未支付,设置关闭状态,如果已支付,不做任何处理
                //手动签收
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
            catch (Exception e) {
                log.info("handleOrderDelayMessage捕获到异常,重新入队---orderId---{}", headers.get("spring_returned_message_correlation"));
                //异常,ture 重新入队,或者false,进入死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    
            }
        }
    
    }
    

    2.4、结果

    运行结果显示,同一个订单号的消息,发送过后2分钟,消费者才接受到,符合预期。

  • 相关阅读:
    mongodb的安装和sql操作
    查看apache和nginx的负载和连接数情况
    ansible中playbook使用
    mysql导入导出命令详解
    生产环境下yum的配置
    firewalld的防火墙
    SOCK5代理服务器
    Linux系统基础优化总结
    服务器内存和缓存的优化
    activemq概念介绍
  • 原文地址:https://www.cnblogs.com/geekdc/p/13550620.html
Copyright © 2011-2022 走看看