zoukankan      html  css  js  c++  java
  • RabbitMQ入门-高效的Work模式

    扛不住的Hello World模式

    上篇《RabbitMQ入门-从HelloWorld开始》介绍了RabbitMQ中最基本的Hello World模型。正如其名,Hello World模型组成简单,也很好理解,我们也看到了一条消息时如何从一个生产者最终流向队列并最终被消费者消费的过程。

    但是,过于简单、单调的模型设计也存在一些缺陷。假使现在队列Queue中挤压了很多的消息没有被消费,Hello World模型中只有一个消费者,在消费消息时会显得力不从心。如果遇上网络状况异常等情况,则消费速率就更加不同乐观,从而影响了消息的处理效率,影响网站应用的性能。
    很直观的思路,我们能想到的是,一个人不行,那就多来几个人,这时候就有了我们的Work模型。

    多管齐下的Work模式

    该模型具有以下特征

    • 一个消息生产者P,一个消息存储队列Q,多个消息消费者C

    • Work模型能够较好的解决资源密集型场景的问题,不需要像Hello World那样孤注一掷的等唯一的消费者消费完

    • 多个消费者,多管齐下,更加高效的并行处理消息

    实例

    如何构造一个资源密集型的场景
    相较于Hello World,Work模式主要是在资源密集型的场景更能发挥威力,那么没有工作环境或者很难遇到这样的情况,我们怎么办?
    其实,这个场景的本质是为了体现一个消费者处理要很长时间的时候,这个模式是如何发挥作用的。那么,我们可以让每个消费者处理的时间长点不就行了,要让Consumer处理的时间长很简单,只要调用Thread.sleep()即可。
    发送端

    对于发送端相对Hello World类型来说,没有什么不同。这是我们队这里发送的消息采用指定的格式比如“hello......”,在后面的发送端接收消息后,当遇到"."则停顿1秒或者2秒,所以程序如下

    package com.ximalaya.openapi.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    /**
     * Created by jackie on 17/8/4.
     */
    public class NewTask {
    
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.3.161");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    
            String message = getMessage(argv);
    
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    
        private static String getMessage(String[] strings) {
            if (strings.length < 1)
                return "Hello World!";
            return joinStrings(strings, " ");
        }
    
        private static String joinStrings(String[] strings, String delimiter) {
            int length = strings.length;
            if (length == 0) return "";
            StringBuilder words = new StringBuilder(strings[0]);
            for (int i = 1; i < length; i++) {
                words.append(delimiter).append(strings[i]);
            }
            return words.toString();
        }
    }
    
    

    注意:这里getMessage方法,如果在运行的配置参数中添加了输入参数,则使用输入参数,如果没有填写,则使用默认值"Hello World"。
    填写输入参数的方法是在如下图位置写上输入参数

    我们执行发送端代码,向队列"task_queue"中塞入4条消息

    从这个动态图片可以发现,通过发送端一次性发送了4条消息。

    接收端

    package com.ximalaya.openapi.rabbitmq.work;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    /**
     * Created by jackie on 17/8/4.
     */
    public class Worker {
    
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.3.161");
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
    
                    System.out.println(" [x] Received '" + message + "'");
                    try {
                        doWork(message);
                    } finally {
                        System.out.println(" [x] Done");
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
        }
    
        private static void doWork(String task) {
            for (char ch : task.toCharArray()) {
                if (ch == '.') {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException _ignored) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    

    注意:这里的doWork方法,该方法当遇到"."是就会睡眠2秒钟,所以像"hello..."这样的消息就会睡眠6秒。
    下面分两种情况来看接收端的处理信息的情况
    一个消费者
    如果此时只运行一个接收端的代码,说明只启动了一个Consumer,我们看看消息的消费过程

    • 图中Ready的消息依次从4->3->2->1->0,表示消息依次被派出消费

    • Uncknowledged表示没有确认的,这里始终是1,因为消息时一个个发送的,等一个个发完了,最终变为0

    • Total表示总共剩余的消息个数,最终消费完变为0

    两个消费者
    如果这时候启动两个客户端,我们看下消息是如何被消费的

    • 图中的Ready从4->2->0,这是因为有两个消费者,消息分别分发到两个消费者上,一次派发两个,分两次派发完

    • Unacknowledged从0->2->0,过程为在一次发送两条消息时,说明有两条消息等待确认是否被消费掉

    • Total则与Ready变化趋势一致

    对比“一个消费者”和“两个消费者”的消费情况,我们确实发现Work的消费处理效率要比Hello World高。

    细心的你可能发现了,为什么在“两个消费者”的情况下能够做到如此公平的每个消费者分配两个,有关这块,限于篇幅,将在下篇详细介绍。

    如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

  • 相关阅读:
    python3 进程间的通信(管道)Pipe
    python3 进程间的通信(队列)Queue
    python3 队列的简单用法Queue
    python3 进程锁Lock(模拟抢票)
    python3 守护进程daemon
    python3 僵尸进程
    python3 process中的name和pid
    python3 Process中的terminate和is_alive
    python3 通过多进程来实现一下同时和多个客户端进行连接通信
    python3 进程之间数据是隔离的
  • 原文地址:https://www.cnblogs.com/bigdataZJ/p/rabbitmq3.html
Copyright © 2011-2022 走看看