zoukankan      html  css  js  c++  java
  • RabbitMQ 之 Work queues

    一、概述

    Work queues 该模式下有一个生产者(Producer)、两个消费者(Consumer01、Consumer02),多个消费者一起处理生产者发送过来的消息,消息分发给消费者一般有两种方式,分别是轮询分发(公平分发)、预期值分发(不公平分发)

    轮询分发是默认的分发方式,对应的是 prefetchCount = 0,它采用的是公平分发的方式,消息分发与消费者的消费速度没有任何关系,如果 prefetchCount != 0,那么它就采用不公平的分发方式,即先按照预期值分发,分发完成之后再按照消费者的消费速度进行消息分发

    二、轮询分发(默认值)

    默认情况下,工作队列(Work queues)采用的是轮询分发的策略,与具体的消费者处理速度快慢没有任何关系

    1、工具类

    public class RabbitmqUtils {
        private static final String HOST_ADDRESS = "192.168.59.130";
        private static final String USER_NAME = "admin";
        private static final String PASSWORD = "admin123";
    
        public static Channel getChannel() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(HOST_ADDRESS);
            factory.setUsername(USER_NAME);
            factory.setPassword(PASSWORD);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
    }
    

    2、Producer

    public class Producer {
        private static final String QUEUE_NAME = "WorkQueuesDemo";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitmqUtils.getChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            String message = "有意思的消息--->";
            for (int i = 1; i < 11; i++) {
                channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes(StandardCharsets.UTF_8));
            }
            System.out.println("Producer send message successfully");
        }
    }
    

    3、Consumer01

    public class Consumer01 {
        private static final String QUEUE_NAME = "WorkQueuesDemo";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitmqUtils.getChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody());
    
                // 休眠 10 s
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 参数一、deliveryTag:消息应答标记
                // 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
                // 处理完逻辑之后应答 ack
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(message);
            };
            // prefetchCount 的默认值为 0
            // 不设置 basicQos 与 channel.basicQos(0) 的效果是等价的,即采用轮询分发的策略
            channel.basicQos(0);
            // 设置手动应答
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
                System.out.println(consumerTag);
            });
        }
    }
    

    4、Consumer02

    public class Consumer02 {
        private static final String QUEUE_NAME = "WorkQueuesDemo";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitmqUtils.getChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody());
                // 参数一、deliveryTag:消息应答标记
                // 参数二、multiple:(false、只应答接收到的那个消息 true、应答所有传递过来的消息)
                // 处理完逻辑之后应答 ack
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println(message);
            };
    
            // 设置 prefetchCount
            channel.basicQos(0);
    
            // 设置手动应答
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
                System.out.println(consumerTag);
            });
        }
    }
    

    5、测试过程及结果

    首先启动 Cousumer01、Consumer02,然后设置 prefetchCount = 0,接着启动 Producer 发送消息,切换到 RabbitMQ 控制台

    6、Consumer01、Consumer02 的消费情况

    三、不公平分发(预期值分发1)

    代码和上面的基本相同,只是将 Consumer01、Consumer02 的 prefetchCount 设置为 1 即可,消息的分发会取决于消费者的处理速度,性能好的消费者将分发到更多的消息

    1、原理图

    2、RabbitMQ 控制台

    3、Consumer01、Consumer02 的消费情况

    Consumer01、Consumer02 先按照 prefetchCount = 1(预期值)进行分发,然后剩下的消息按照消费者的性能进行分配

    四、不公平分发(预期值分发2)

    代码和上面的基本相同,只是将 Consumer01 的 prefetchCount 设置为 2、Consumer02 的 prefetchCount 设置为 4 即可

    1、原理图

    2、RabbitMQ 控制台

    3、Consumer01、Consumer02 的消费情况 

    Consumer01 设置了 prefetchCount = 2,Consumer02 设置了 prefetchCount = 4,所以 10 个消息分发两个给 Consumer01,分发 4 个给 Consumer02,剩下的 4 个消息的分发取决于消费者的处理速度(目前测试)

     

  • 相关阅读:
    Webservice详解
    Spring IOC/DI和AOP原理
    MySQL 使用JOIN优化子查询
    MySQL 更新语句技巧
    MySQL插入语句解析
    MySQL用户无法登陆问题
    MySQL基础学习(二) 常用SQL命令
    Servlet/JSP-08 EL表达式
    插值和空间分析(一)_探索性数据分析(R语言)
    爱重启的windows,伤不起
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/15530171.html
Copyright © 2011-2022 走看看