zoukankan      html  css  js  c++  java
  • 2.6.4 RabbitMQ WorkQueue工作队列

    1、工作队列是什么?

    工作队列(Work Queues),又称任务队列(Task Queues)概念是将耗时的任务分发给多个消费者(工作者)。主要解决这样的问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

    这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求窗口内处理复杂的任务。

    发送端(Sender):

    using System;
    using System.Text;
    using RabbitMQ.Client;
    
    namespace Sender
    {
        class Program
        {
            static void Main(string[] args)
            {
                //队列名称
                var QUEUE_NAME = "task_queue1";
                //队列是否需要持久化
                var DURABLE = false;
                //需要发送的消息列表
                var msgs = new[] { "task 1", "task 2", "task 3", "task 4", "task 5", "task 6" };
                // 1.connection & channel
                var factory = new ConnectionFactory() { HostName = "test.guanjieerp.cn" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        // 2.queue
                        channel.QueueDeclare(QUEUE_NAME, DURABLE, false, false, null);
                        // 3.publish msg
                        for (int i = 0; i < msgs.Length; i++)
                        {
                            var message = msgs[i];
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", QUEUE_NAME, null, body);
                            Console.WriteLine("** new task ****:" + message);
                        }
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    

    接收端(Receiver):

    using System;
    using System.Text;
    using System.Threading;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Receiver
    {
        class Program
        {
            static void Main(string[] args)
            {
                //队列名称
                var QUEUE_NAME = "task_queue1";
                //队列是否需要持久化
                var DURABLE = false;
                //自动ACK
                var autoAck = true;
                var factory = new ConnectionFactory
                {
                    HostName = "test.guanjieerp.cn"
                };
                Console.WriteLine("*** Work ***");
                //1.connection & channel
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //2.queue
                        channel.QueueDeclare(QUEUE_NAME, DURABLE, false, false, null);
                        //3. consumer instance
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
                            //deal task
                            doWork(message);
                        };
                        //4.do consumer
                        channel.BasicConsume(QUEUE_NAME, autoAck, consumer);
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
    
    
            private static void doWork(String msg)
            {
                Console.WriteLine("**** deal task begin :" + msg);
                //假装task比较耗时,通过sleep()来模拟需要消耗的时间
                if ("sleep".Equals(msg))
                {
                    Thread.Sleep(1000 * 60);
                }
                else
                {
                    Thread.Sleep(1000);
                }
                Console.WriteLine("**** deal task finish :" + msg);
            }
        }
    }

    运行接收端,再运行发送端,可以看到任务被自动消费,如下面结果:

    2、循环分发

     新建一个项目,跟接收端(Receiver)一模一样,启动2个接受端,然后启动发送端。可以看到运行结果:

    我们发现,发送端共发送了6条消息,接收端1和接受端2分别收到了3个消息,而且是循环轮流分发到的,这种分发的方式就是循环分发。

    3、手动确认消息

    假如我们在发送的消息里面添加“sleep"

    //需要发送的消息列表
    var msgs = new[] {"sleep","task 1", "task 2", "task 3", "task 4", "task 5", "task 6" };

    我们看看接收端的代码逻辑,这个sleep要耗时1分钟,万一在这1分钟之内,工作进程崩溃了或者被kill了,会发生什么情况呢?根据上面的代码:

     //自动ACK
    var autoAck = true;
    //4.do consumer
    channel.BasicConsume(QUEUE_NAME, autoAck, consumer);

     

    自动确认消息为true,每次RabbitMQ向消费者发送消息之后,会自动发确认消息(我工作你放心,不会有问题),这个时候消息会立即从内存中删除。如果工作者挂了,那将会丢失它正在处理和未处理的所有工作,而且这些工作还不能再交由其他工作者处理,这种丢失属于客户端丢失。

    为了应对这种情况,RabbitMQ支持消息确认。消费者处理完消息之后,会发送一个确认消息告诉RabbitMQ,消息处理完了,你可以删掉它了。

    代码修改(Receiver和Receiver2两个接收端都同步修改),修改步骤:

    3.1.将自动确认改为false

     //自动ACK
     var autoAck = false;
     //4.do consumer
     channel.BasicConsume(QUEUE_NAME, autoAck, consumer);

    3.2.消息处理之后再通过channel.basicAck进行消息确认

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

    继续运行两个接收端和发送端

     可以看到Receive在等待sleep,Receive2已经执行完了,关掉Receive不等待,这时候去RabbitMQ后台可以看到未消费的消息有4条,而且Receive2将Receive未处理完和未来得及处理的消息都给处理了。

    4、公平分发

     按照以上循环分发的方式,每个接收端会分到相同数量的任务。这时候有个问题,假如有些任务比较耗时,前面的任务没来得及完成,后面又来了那么多任务,来不及处理,那怎么办?这时候需要公平分发任务,代码实现:

     //4.do consumer
     channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    

    告诉RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样RabbitMQ就不会轮流分发了,而是寻找空闲的工作者进行分发。

     5、消息持久化

    当接收端消息确认的时候,假如RabbitMQ服务停止了,这时消息都会清空,这种丢失叫做服务端丢失,需要把消息持久化来应对这种情况。

    5.1队列持久化,发送端、接收端的DURABLE修改为True

    //队列是否需要持久化
    var DURABLE = false;
    //2.queue
    channel.QueueDeclare(QUEUE_NAME, DURABLE, false, false, null);
    

    5.2消息持久化,发送端,发送消息时设置持久化

    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    //发布消息
    channel.BasicPublish("", QUEUE_NAME, properties, body);
  • 相关阅读:
    java抽象类
    java不支持多继承
    logback颜色
    @ConfigurationProperties、@Value、@PropertySource
    redis命令
    mac下安装rabbitmq
    mac下安装jmeter
    python TypeError: 'int' object is not callable 问题解决
    白炽灯串联发光问题_高中知识(原创)
    python 离散序列 样本数伸缩(原创)
  • 原文地址:https://www.cnblogs.com/duyao/p/14329202.html
Copyright © 2011-2022 走看看