zoukankan      html  css  js  c++  java
  • RabbitMQ的轮询模式和公平分发(二)

    一、常用的消息模式

    我们在工作的使用中,经常会遇到多个消费者监听同一个队列的情况,模型如下图所示:

    当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢;
    主要有两种模式:
    1、轮询模式的分发:一个消费者一条,按均分配;
    2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

    二、轮询模式(Round-Robin)

    该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;

    2.1 生产者发消息到队列

        public static void SendRoundRobinMessage()
            {
                try
                {
                    var conn = GetConnection();
                    var channel = conn.CreateModel();
                    channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                    for(var i = 0; i < 50; i++)
                    {
                        var body = Encoding.UTF8.GetBytes(i.ToString());
                        channel.BasicPublish("", QUEUE_NAME, null, body);
                    }
                    Console.WriteLine("消息发送完成!");
                    channel.Close();
                    conn.Close();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    2.2 消费者1代码

    消费者1每处理完一次消息,线程休息1秒;

    		/// <summary>
    		/// 轮询分发消费者1
    		/// </summary>
    		static void SimpleConsumer1()
    		{
    			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "admin",//用户名
    				Password = "admin",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer1 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(1000);
    				//确认该消息已被消费
    				//channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", true, consumer);
    			Console.WriteLine("Simple Consumer1 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    消费者接收消息如图:

    2.3 消费者2代码

    消费者2每处理完一次消息,线程休息3秒;

    		/// <summary>
    		/// 轮询分发消费者2
    		/// </summary>
    		static void SimpleConsumer2()
    		{
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "guest",//用户名
    				Password = "guest",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer2 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(3000);
    				//确认该消息已被消费
    				//channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", true, consumer);
    			Console.WriteLine("Simple 2 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    消费者接收消息如图:

    2.4 轮询分发小结

    消费者1和2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。

    三、公平分发(Fair Dispatch)

    由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;

    3.1 生产者发消息到队列

    代码如下:

      public static void SendQosMessage()
            {
                try
                {
                    var conn = GetConnection();
                    var channel = conn.CreateModel();
                    channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                    channel.BasicQos(0,1,false);
                    for (var i = 0; i < 50; i++)
                    {
                        var body = Encoding.UTF8.GetBytes(i.ToString());
                        channel.BasicPublish("", QUEUE_NAME, null, body);
                    }
                    Console.WriteLine("消息发送完成!");
                    channel.Close();
                    conn.Close();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    3.2 消费者1代码如下

    为了模拟处理消息的时长,每处理完一条消息让线程休息1s

    		static void SimpleConsumer1()
    		{
    			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "admin",//用户名
    				Password = "admin",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			channel.BasicQos(0, 1, false);
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer1 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(1000);
    				//确认该消息已被消费
    				channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", false, consumer);
    			Console.WriteLine("Simple 1 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    处理的消息结果如图:

    3.3 消费者2处理消息较消费者1慢,代码如下

    为了模拟处理消息的时长,每处理完一条消息让线程休息3s

    static void SimpleConsumer2()
    		{
    			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "admin",//用户名
    				Password = "admin",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			channel.BasicQos(0, 1, false);
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer2 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(3000);
    				//确认该消息已被消费
    				channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", false, consumer);
    			Console.WriteLine("Simple 2 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    处理消息的结果如图:

    3.4 处理消息的结果

    从结果可以看到,消费者1在相同时间内,处理了更多的消息;以上代码我们实现了公平分发模式;

    3.5 注意点

    (1)消费者一次接收一条消息,代码channel.BasicQos(0, 1, false);
    (2) 公平分发需要消费者开启手动应答,关闭自动应答
    关闭自动应答代码channel.BasicConsume("queue_test", false, consumer);
    消费者开启手动应答代码:channel.BasicAck(ea.DeliveryTag, false);

    四、小结

    (1)当队列里消息较多时,我们通常会开启多个消费者处理消息;公平分发和轮询分发都是我们经常使用的模式。
    (2)轮询分发的主要思想是“按均分配”,不考虑消费者的处理能力,所有消费者均分;这种情况下,处理能力弱的服务器,一直都在处理消息,而处理能力强的服务器,在处理完消息后,处于空闲状态;
    (3) 公平分发的主要思想是"能者多劳",按需分配,能力强的干的多。
    参考文档: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

  • 相关阅读:
    jQuery 源码解析(二十四) DOM操作模块 包裹元素 详解
    jQuery 源码解析(二十三) DOM操作模块 替换元素 详解
    jQuery 源码解析(二十二) DOM操作模块 复制元素 详解
    jQuery 源码分析(二十一) DOM操作模块 删除元素 详解
    jQuery 源码分析(二十) DOM操作模块 插入元素 详解
    jQuery 源码分析(十九) DOM遍历模块详解
    python 简单工厂模式
    python 爬虫-协程 采集博客园
    vue 自定义image组件
    微信小程序 image组件坑
  • 原文地址:https://www.cnblogs.com/zqllove/p/12654907.html
Copyright © 2011-2022 走看看