zoukankan      html  css  js  c++  java
  • RabbitMQ入门_05_多线程消费同一队列

    A. 多线程消费同一队列

    参考资料:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

    消费一条消息往往比产生一条消息慢很多,为了防止消息积压,一般需要开启多个工作线程同时消费消息。在 RabbitMQ 中,我们可以创建多个 Consumer 消费同一队列。示意图如下:

    workqueue

    gordon.study.rabbitmq.workqueue.Sender.java

    public class Sender {
     
        private static final String QUEUE_NAME = "tasks";
     
        private String name;
     
        public Sender(String name) {
            this.name = name;
        }
     
        public void work() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            for (int i = 0; i < 10;) {
                String message = "NO. " + ++i;
                TimeUnit.MILLISECONDS.sleep(100);
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.printf("(%1$s)[===>%2$s    ] %3$s
    ", name, ":" + QUEUE_NAME, message);
            }
     
            channel.close();
            connection.close();
        }
    }
    

    gordon.study.rabbitmq.workqueue.Receiver.java

    public class Receiver {
     
        private static final String QUEUE_NAME = "tasks";
     
        private String name;
     
        private int sleepTime;
     
        public Receiver(String name, int sleepTime) {
            this.name = name;
            this.sleepTime = sleepTime;
        }
     
        public void work() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.printf(" [    %2$s<===](%1$s) %3$s
    ", name, QUEUE_NAME, message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException e) {
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    gordon.study.rabbitmq.workqueue.Test01.java

    public class Test01 {
     
        public static void main(String[] args) throws Exception {
            Receiver recv1 = new Receiver("A", 200);
            recv1.work();
            Receiver recv2 = new Receiver("B", 200);
            recv2.work();
            Sender sender = new Sender("S");
            sender.work();
        }
    }
    

    运行 Test01,发现 A、B 两个消费者轮流获取 S 发送的消息。
    RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即,轮询(round-robin)分发消息。

    轮询很好,可是如果两个消费者消费能力不一样呢?
    gordon.study.rabbitmq.workqueue.Test02SlowConsumer.java

    public class Test02SlowConsumer {
     
        public static void main(String[] args) throws Exception {
            Receiver recv1 = new Receiver("A", 200);
            recv1.work();
            Receiver recv2 = new Receiver("B", 800);
            recv2.work();
            Sender sender = new Sender("S");
            sender.work();
        }
    }
    

    将消费者B 的消费时间提高到800毫秒,问题就出现了:B 依然分到了一半消息,需要运行很久才能处理完。

    B. 公平分发(fair dispatch)

    怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。

    gordon.study.rabbitmq.workqueue.QosAcknowledgeReceiver.java

    public class QosAcknowledgeReceiver {
     
        private static final String QUEUE_NAME = "tasks";
     
        private String name;
     
        private int sleepTime;
     
        public QosAcknowledgeReceiver(String name, int sleepTime) {
            this.name = name;
            this.sleepTime = sleepTime;
        }
     
        public void work() throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            channel.basicQos(1);
     
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.printf(" [    %2$s<===](%1$s) %3$s
    ", name, QUEUE_NAME, message);
                    try {
                        TimeUnit.MILLISECONDS.sleep(sleepTime);
                    } catch (InterruptedException e) {
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    

    代码第22行的 basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

    查看 basicQos 重载方法,发现几个有趣的特性(参考https://www.rabbitmq.com/consumer-prefetch.html):

    • basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
    • prefetchSize 与 prefetchCount 可以同时设置,达到任何一个限制,则队列暂停推送消息
    • global 参数表示前两个参数的作用域,true 表示限制是针对信道的,false 表示限制是针对消费者的(我还没试过一个信道支持多个消费者的例子,样例代码见下方)
    • 可以对同一个信道同时设置 global 为 true 和 false 的 Qos,表示队列要考虑每个消费者的限制,同时还要考虑整个信道的限制
    • 看起来API注释是错的,因为 global 默认是 false,所以第22行代码应该是把当前信道上每个消费者(当然,上面的例子中只有一个)的 prefetchCount 设为 1
    Channel channel = ...;
    Consumer consumer1 = ...;
    Consumer consumer2 = ...;
    channel.basicQos(10, false); // Per consumer limit
    channel.basicQos(15, true);  // Per channel limit
    channel.basicConsume("my-queue1", false, consumer1);
    channel.basicConsume("my-queue2", false, consumer2);
    

    第37行代码将 autoAck 设为 false,向 Broker 发送 ack 响应的任务就交给开发人员了。

    第34行代码在任务真正完成后,调用 basicAck 方法主动通知队列消息已成功消费。当队列收到 ack 确认后,会把下一条消息推送过来,并将该消息从队列中删除。

    Qos 方案示意图如下:

    Qos workqueue

    gordon.study.rabbitmq.workqueue.Test03FairDispatch.java

    public class Test03FairDispatch {
     
        public static void main(String[] args) throws Exception {
            QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
            recv1.work();
            QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
            recv2.work();
            Sender sender = new Sender("S");
            sender.work();
        }
    }
    

    运行Test03,可以看到 RabbitMQ 按照消费者的实际能力分配消息。

  • 相关阅读:
    使用a标签制作tooltips
    使用editorconfig配置你的编辑器
    JointJS绘制流程图
    用highcharts展现你的数据
    css段落首字母下沉
    sklearn框架的系统学习
    numpy删除二维数据矩阵的行和列
    sklearn中机器学习算法评价指标
    sklearn调用逻辑回归算法
    sklearn调用多项式回归
  • 原文地址:https://www.cnblogs.com/gordonkong/p/6941721.html
Copyright © 2011-2022 走看看