zoukankan      html  css  js  c++  java
  • RabbitMQ的轮询模式和公平分发(二)

    一、常用的消息模式

    我们在工作的使用中,经常会遇到多个消费者监听同一个队列的情况,模型如下图所示:

    当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢;
    主要有两种模式:
    1、轮询模式的分发:一个消费者一条,按均分配;
    2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

    二、轮询模式(Round-Robin)

    该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成;

    2.1 生产者发消息到队列

        public static void SendRoundRobinMessage()
            {
                try
                {
                    var conn = GetConnection();
                    var channel = conn.CreateModel();
                    channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                    for(var i = 0; i < 50; i++)
                    {
                        var body = Encoding.UTF8.GetBytes(i.ToString());
                        channel.BasicPublish("", QUEUE_NAME, null, body);
                    }
                    Console.WriteLine("消息发送完成!");
                    channel.Close();
                    conn.Close();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    2.2 消费者1代码

    消费者1每处理完一次消息,线程休息1秒;

    		/// <summary>
    		/// 轮询分发消费者1
    		/// </summary>
    		static void SimpleConsumer1()
    		{
    			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "admin",//用户名
    				Password = "admin",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer1 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(1000);
    				//确认该消息已被消费
    				//channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", true, consumer);
    			Console.WriteLine("Simple Consumer1 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    消费者接收消息如图:

    2.3 消费者2代码

    消费者2每处理完一次消息,线程休息3秒;

    		/// <summary>
    		/// 轮询分发消费者2
    		/// </summary>
    		static void SimpleConsumer2()
    		{
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "guest",//用户名
    				Password = "guest",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer2 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(3000);
    				//确认该消息已被消费
    				//channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", true, consumer);
    			Console.WriteLine("Simple 2 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    消费者接收消息如图:

    2.4 轮询分发小结

    消费者1和2的消息处理能力不同,但是最后处理的消息条数相同,是“按均分配”。

    三、公平分发(Fair Dispatch)

    由于消息接收者处理消息的能力不同,存在处理快慢的问题,我们就需要能者多劳,处理快的多处理,处理慢的少处理;

    3.1 生产者发消息到队列

    代码如下:

      public static void SendQosMessage()
            {
                try
                {
                    var conn = GetConnection();
                    var channel = conn.CreateModel();
                    channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                    channel.BasicQos(0,1,false);
                    for (var i = 0; i < 50; i++)
                    {
                        var body = Encoding.UTF8.GetBytes(i.ToString());
                        channel.BasicPublish("", QUEUE_NAME, null, body);
                    }
                    Console.WriteLine("消息发送完成!");
                    channel.Close();
                    conn.Close();
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
    

    3.2 消费者1代码如下

    为了模拟处理消息的时长,每处理完一条消息让线程休息1s

    		static void SimpleConsumer1()
    		{
    			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "admin",//用户名
    				Password = "admin",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			channel.BasicQos(0, 1, false);
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer1 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(1000);
    				//确认该消息已被消费
    				channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", false, consumer);
    			Console.WriteLine("Simple 1 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    处理的消息结果如图:

    3.3 消费者2处理消息较消费者1慢,代码如下

    为了模拟处理消息的时长,每处理完一条消息让线程休息3s

    static void SimpleConsumer2()
    		{
    			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
    			//创建连接工厂
    			ConnectionFactory factory = new ConnectionFactory
    			{
    				UserName = "admin",//用户名
    				Password = "admin",//密码
    				HostName = "127.0.0.1"//rabbitmq ip
    			};
    
    			//创建连接
    			var connection = factory.CreateConnection();
    			//创建通道
    			var channel = connection.CreateModel();
    			channel.BasicQos(0, 1, false);
    			//事件基本消费者
    			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
    			//接收到消息事件
    			consumer.Received += (ch, ea) =>
    			{
    				var message = Encoding.UTF8.GetString(ea.Body);
    				Console.WriteLine($"Simple Consumer2 收到消息: {message},时间{DateTime.Now}");
    				Thread.Sleep(3000);
    				//确认该消息已被消费
    				channel.BasicAck(ea.DeliveryTag, false);
    			};
    			//启动消费者 设置为手动应答消息
    			channel.BasicConsume("queue_test", false, consumer);
    			Console.WriteLine("Simple 2 消费者已启动");
    			Console.ReadKey();
    			channel.Dispose();
    			connection.Close();
    		}
    

    处理消息的结果如图:

    3.4 处理消息的结果

    从结果可以看到,消费者1在相同时间内,处理了更多的消息;以上代码我们实现了公平分发模式;

    3.5 注意点

    (1)消费者一次接收一条消息,代码channel.BasicQos(0, 1, false);
    (2) 公平分发需要消费者开启手动应答,关闭自动应答
    关闭自动应答代码channel.BasicConsume("queue_test", false, consumer);
    消费者开启手动应答代码:channel.BasicAck(ea.DeliveryTag, false);

    四、小结

    (1)当队列里消息较多时,我们通常会开启多个消费者处理消息;公平分发和轮询分发都是我们经常使用的模式。
    (2)轮询分发的主要思想是“按均分配”,不考虑消费者的处理能力,所有消费者均分;这种情况下,处理能力弱的服务器,一直都在处理消息,而处理能力强的服务器,在处理完消息后,处于空闲状态;
    (3) 公平分发的主要思想是"能者多劳",按需分配,能力强的干的多。
    参考文档: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

  • 相关阅读:
    mysql cpu 占用高
    使用cron命令配置定时任务(cron jobs)
    python 获取时间
    linux 免密码登陆
    linux 查看登录日志
    shizhong
    正则(?is)
    python shell
    linux 时间设置
    java获取当前时间前一周、前一月、前一年的时间
  • 原文地址:https://www.cnblogs.com/zqllove/p/12654907.html
Copyright © 2011-2022 走看看