zoukankan      html  css  js  c++  java
  • RabbitMQ 优先级队列

    一、概述

    在实际应用场景中,我们推送消息,希望给消息设置优先级,比如说京东双 11 活动,它希望将消息优先推送给京东的 vip,而对于非 vip 用户消息推送的优先级就低一些,那么怎么实现呢?

    其实很简单,通过优先级队列就可以完美解决上述应用场景

    二、原理图

    三、编码

    1、applicaiton.yml

    spring:
      rabbitmq:
        host: 192.168.59.135
        port: 5672
        username: admin
        password: admin123
        publisher-confirm-type: correlated
        publisher-returns: true
        # 开启 ack
        listener:
          direct:
            acknowledge-mode: manual
          simple:
            acknowledge-mode: manual #采取手动应答
            #concurrency: 1 # 指定最小的消费者数量
            #max-concurrency: 1 #指定最大的消费者数量
            retry:
              enabled: true # 是否支持重试
    

    2、自定义配置类 PriorityConfig

    @Configuration
    public class PriorityConfig {
        private static final String PRIORITY_EXCHANGE = "priority_exchange";
        private static final String PRIORITY_QUEUE = "priority_queue";
        private static final String PRIORITY_KEY = "priority";
    
        // 声明优先级交换机(type = direct)
        @Bean(PRIORITY_EXCHANGE)
        public DirectExchange priorityExchange() {
            return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE).durable(true).build();
        }
    
        // 声明优先级队列
        @Bean(PRIORITY_QUEUE)
        public Queue priorityQueue() {
            /**
             * maxPriority(int maxPriority):设置队列支持的最大优先级数量,如果没有设置,则队列将不支持消息优先级
             * 官方支持的优先级范围是 0 ~ 255,超过 255 就会发生报错,但是一般企业使用的优先级是 0 ~ 10,如果 maxPriority 设置
             * 的太大,会浪费 cpu 和 内存,因为消息是要在队列中排队的,队列长度太大,排序的过程中会损耗性能
             */
            return QueueBuilder.durable(PRIORITY_QUEUE).maxPriority(10).build();
        }
    
        // 优先级队列绑定优先级交换机
        @Bean
        public Binding priorityQueueBindingPriorityExchange(@Qualifier(PRIORITY_QUEUE) Queue queue,
                                                            @Qualifier(PRIORITY_EXCHANGE) DirectExchange exchange) {
    
            return BindingBuilder.bind(queue).to(exchange).with(PRIORITY_KEY);
        }
    }
    

    3、发布确认自定义类 MyConfirmCallback

    @Slf4j
    @Component
    public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
        /**
         * 交换机确认回调方法
         * 1、Producer 发送的消息,交换机确认收到
         * correlationData:保存消息回调 ID 及其它相关的信息
         * ack:true
         * cause:null
         * <p>
         * 2、Producer 发送的消息,交换机没有收到
         * correlationData:保存消息回调 ID 及其它相关的信息
         * ack:false
         * cause:交换机没有收到消息的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData != null ? correlationData.getId() : "";
            if (ack) {
                log.info("交换机已经收到 id 为:{}的消息", id);
            } else {
                log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
            }
        }
    
        /**
         * 如果交换机没有将消息路由到队列,会触发该回调方法
         */
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            log.info("消息: {} 被服务器退回--->退回原因: {},交换机是: {},路由key是:{},退回编号是:{}",
                    new String(returned.getMessage().getBody()), returned.getReplyText(), returned.getExchange(),
                    returned.getRoutingKey(), returned.getReplyCode());
        }
    }
    

    4、Producer

    @Slf4j
    @RestController
    public class Producer {
        private static final String PRIORITY_EXCHANGE = "priority_exchange";
        private static final String PRIORITY_QUEUE = "priority_queue";
        private static final String PRIORITY_KEY = "priority";
    
        // 1、依赖注入 rabbitTemplate
        @Autowired
        private RabbitTemplate rabbitTemplate;
        // 2、依赖注入 myConfirmCallback
        @Autowired
        private MyConfirmCallback myConfirmCallback;
    
        // 3、完成了 1、2 的注入之后再设置 rabbitTemplate 的回调对象
        @PostConstruct
        public void init() {
            // 消息成功传递给交换机时会触发 MyConfirmCallback 中的回调方法 confirm()
            rabbitTemplate.setConfirmCallback(myConfirmCallback);
            // 消息回退时会触发 MyConfirmCallback 中的回调方法 returnedMessage()
            rabbitTemplate.setReturnsCallback(myConfirmCallback);
        }
    
        @GetMapping("/priority/sendMessage/{msg}")
        public void sendMessage(@PathVariable("msg") String msg) {
            // 设置唯一 ID
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(UUID.randomUUID().toString());
    
            rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg, correlationData);
            log.info("发送一条未设置优先级的消息", msg);
    
            String msg1 = msg + 0;
            rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg1, (message -> {
                message.getMessageProperties().setPriority(0);
                return message;
            }), correlationData);
            log.info("发送一条优先级为 0 的消息", msg1);
    
            String msg2 = msg + 2;
            rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg2, (message -> {
                message.getMessageProperties().setPriority(2);
                return message;
            }), correlationData);
            log.info("发送一条优先级为 2 的消息", msg2);
    
            String msg3 = msg + 5;
            rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg3, (message -> {
                message.getMessageProperties().setPriority(5);
                return message;
            }), correlationData);
            log.info("发送一条优先级为 5 的消息", msg3);
    
            String msg4 = msg + 10;
            rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE, PRIORITY_KEY, msg4, (message -> {
                message.getMessageProperties().setPriority(10);
                return message;
            }), correlationData);
            log.info("发送一条优先级为 10 的消息", msg4);
        }
    }
    

    5、Consumer

    @Slf4j
    @Component
    public class Consumer {
        private static final String PRIORITY_QUEUE = "priority_queue";
    
        @RabbitListener(queues = {PRIORITY_QUEUE})
        public void receivedMessage(Message message, Channel channel, CorrelationData correlationData) throws IOException {
            try {
                String msg = new String(message.getBody());
                log.info("消费者成功接收到消息:{}", msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                log.info("消息消费错误");
                // 出现异常之后拒绝消息,并且消息重新入队
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            }
        }
    }
    

      

    四、测试

    要让队列实现优先级需要做的事情如下

    1、队列需要设置为优先级队列

    2、消息需要设置消息的优先级

    3、生产者必须先将消息发送到队列中,让队列对设置了优先级的消息进行排队

    4、1、2、3 完成之后再启动消费者进行消费即可

    要想实现上述功能,我们先将 Consumer 的 @RabbitListener 注解注释掉,然后启动 Springboot 项目

    浏览器发送请求: http://localhost:8080/priority/sendMessage/小毛毛是最可爱的

    消息发送完成之后,然后打开 Consumer 的 @RabbitListener 注解,再次启动 Springboot 项目

    从消费者的消费结果可以看出,优先级越高的消息越早被消费,如果未设置消息的优先级,那么该默认的优先级看起来是 1

  • 相关阅读:
    JavaScript | 闭包
    Photoshop | 快速抠头发(调整边缘/选择并遮住)
    JavaScript | 基础表单验证(纯Js)
    JavaScript | 事件
    JavaScript | 数组
    JavaScript | 对象与属性
    JavaScript | 基础(变量/引用/转换/函数)
    Altium Designer 10 | 常用库及部分元件名中英文对照表
    电路 | 基本概念
    读点什么 |《把时间当作朋友》李笑来
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/15572173.html
Copyright © 2011-2022 走看看