zoukankan      html  css  js  c++  java
  • RabbitMQ 原文译02--工作队列

    工作队列:

    在上一篇文章中我们我们创建程序发送和接受命名队列中的消息,在这篇文章我会创建一个工作队列,用来把耗时的操作分配给多个执行者。

    工作队列(任务队列)的主要实现思想是避免马上执行资源密集型的任务然后不得不等待其完成,而应该是安排这些任务延后执行。我们把这些任务包装(压缩)成消息然后发送到消息队列当中,最终由运行在后端的工作进程获取消息然后处理它。多个工作进程可以共享这个任务。

    这种概念非常实用于web应用程式,因为在一个非常短的http web请求过程中想完成一个非常复杂耗时的任务基本上是不可能的。

    准备工作

    在上个文章中我们发送了一个包含"hello word" 的消息,现在我们将会发送一个代表复杂任务的字符串,我们没有真实世界的任务场景,想图片尺寸的修改,pdf文档的输出处理,所以让我们使用Thread.sleep 来假设我们非常的任务处理非常的多。让我们使用"." 的数量来表示任务的量级,一个"."就代表需要执行1秒钟的任务,如"Hello..." 表示这个任务需要执行3秒才能完成。

     我们稍微修改下之前的发送代码,让他能够从命令行发送任意的消息,这个程序会将相应的安排任务到我们的工作队列中,所以我们修改程序文件的名字为NewTask.cs.

    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    
    var properties = channel.CreateBasicProperties();
    properties.SetPersistent(true);
    
    channel.BasicPublish(exchange: "",routingKey: "task_queue",basicProperties: properties,body: body);

    然后为了帮助从命令行参数获取消息

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }

    然后我们的Receive.cs  也需要做一些改变,当从消息中解析到一个"."时,需要假装执行1秒钟的工作。它会处理从RabbitMQ发送过来的消息,所以我们叫它Worker.cs

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);
    
        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);
    
        Console.WriteLine(" [x] Done");
    };
    channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);

    我们假的任务去模拟执行时间

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

     注:请注意这里的队列名称"task_queue"是我新建的一个队列,为了和上一篇的代码进行区分.

    循环分配

    使用任务队列的一个优势就是可以并行执行任务,如果我们的产生了大量的等待去处理的任务,我们可以直接加入更多的工作者去处理任务,非常容易扩展。

    首先让我们同时运行两个Work.cs程序,它们将都可以从队列当中获取任务,那是如何工作的哪,接着看:

    你总共需要打开3个控制台程序(console),其中2个运行Worker.cs程式,是我们消息的消费者。

    第三个是我们的消息生产者。

    然后我们可以使用我们的生产者发送一些消息:

    然后观察我们的2个Worker.cs 程序的输出:

    默认情况下RabbitMQ将会有序的发送消息给下一个消费者,所以平均每个消费者将会得到一致数量的消息,这种发送消息的模式叫做"轮询".

    消息确认

    做一项工作任务可能会持续几秒钟,你也许会好奇如果一个执行长时间的任务在执行过程中突然停止工作(仅执行了部分内容)会发生什么,在我们当前的代码中一旦RabbitMQ把消息发送给消费者,它会马上把它从内存中删除,在这种情况一旦你终止了正在工作的程序,那么我们会丢失它正在处理到的消息,我们同样会丢失所有分配给这个执行者但是还没有完成的消息。

    但是我们不希望休息丢失,如果一个执行者终止,我们希望该消息能够传递给下一个执行者。

    为了保证消息永远不丢失,RabbitMQ支持消息确认(acknowledgments),当一个消息被确认接受执行完成后,一个消费者可以发送一个ack(nowledgement)告诉RabbitMQ消息已经完成,RabbitMQ可以把它删除了。

    如果一个消费者终止(会话关闭,连接关闭,或者tcp连接丢失)而还未发送ack,RabbitMQ就会知道,消息还没有完全被处理完成,它会重新把消息排入队列。如果同时有另外一个消费者在线,它会马上把他发送给另外的消费者.通过这种方式你可以确保没有消息丢失,当消费者意外终止的时候。

    这里没有消息超时,RabbitMQ将会重新推送消息当一个消费者终止时。当一个处理话费非常长的时间的时候也是没有任何问题的。

    消息自动确认默认情况下是打开的,在之前的案例中我们显示的关闭了这个选项,通过设置noack这个参数为true。现在是时候改变这个参数,让执行者发送acknowledgment 给RabbitMQ

    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("Thread:{0} [x] Received {1}", Thread.CurrentThread.GetHashCode(), message);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
    
                        Console.WriteLine("Thread:{0} [x] Done",Thread.CurrentThread.GetHashCode());
    
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    channel.BasicConsume(queue: "task_queue",noAck: false,consumer: consumer);

    使用这个方式我们可以确保及时我们关闭了正在执行任务的消费者时,也可以保证没有消息对视,所有unacknowledged 状态的消息会RabbitMQ重新进行分发。

    忘记发送确认:忘记执行BasicAck是个非常见常见的问题,但是其造成的影响却是非常严重的,当客户端退出的时候消息将会重新的分派(看起来像是一个随机的分配),但是由于RabbitMQ无法释放unacked 的消息,RabbitMQ将会持续吃掉越来越多的内存。

    消息持久化

    我们已经学习了怎么样保证消息不回丢失当消费者终止的时候,但我们的消息依然会丢失当我们的RabbitMQ服务器停止的时候。

    当RabbitMQ服务器退出或崩溃的时候,它会丢失它所保存的消息和队列,除非你指定它不要这么做。为了保证消息不丢失我们需要做两件事情,标记队列和消息持久化(durable)。

    首先为了保证队列不丢失,我们需要声明持久化的队列。

    channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

    虽然这个命令本身是没有问题的,但是当我们当前的环境中它是不会成功执行的,这是因为我们已经声明了一个名字为"hello"的非持久化的队列。让任何程序试图使用不同的参数声明一个已经存在的队列的时候,RabbitMQ将会返回一个错误的消息。

    所以我们可以声明一个不同名字的队列

    channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);

    消息的生产者和消费者都需要应用这QueueDeclare命令。

    现在我们确认即使RabbitMQ服务器意外终止了,我们也不对丢失我们的"task_queue"这个队列,现在我们需要设置我们的消息持久化--通过设置 IBasicProperties.SetPersistent=true来完成。

    var properties = channel.CreateBasicProperties();
    properties.SetPersistent(true);

    关于消息持久化:

    标记消息持久化并不能100%保证消息不会丢失,虽然它告诉RabbitMQ把消息进行持久化,但是仍然存在那么一瞬间--RabbitMQ已经接受了消息但是还没保存它,同样RabbitMQ不会对每一个到来的消息都及时的做fsync(进行内存数据同步到存储设备),它可能仅仅把消息保存在内存中而不是磁盘上。持久化虽然保证不够健壮,但是对我们的简单程序来说足够了。如果您想要更高级别的消息持久化确认可以使用publisher confirms.

    公平调度

    你也许注意到了,这种分发模式依然不是非常符合我们想要实现的,例如在一个场景当中所有的奇数的消息都是重量级较大的(执行时间较长),所有的偶数任务都是执行时间较短的消息,那么其中一个消息处理者将会持续化的处于忙碌状态,而另外一个消息处理者基本行很难做较复杂的任务。

    发生这种情况的原因是RabbitMQ仅仅把消息队列当中的消息发送给消费者,而不去查看对应消费者的unacknowledged的数量,它只是盲目的把第n-th的消息发送给n-th的消费者。

    为了避免这个问题我们可以使用basicQos 方法,设置其参数prefetchCount = 1,这样会告诉RabbitMQ不要同时给一个消费者推送多于一个消息,或者换句话来说当一个处理者正在处理一个消息还没有发送acknowledged 给RabbitMQ来确认消息已经完成时不要发送另外一个消息给它。相应的,RabbitMQ会吧消息发送给下一个已经空闲的消费者。

    channel.BasicQos(0, 1, false);

    注意关于队列大小:

    当所有的工作者都处于忙碌状态时,队列的大小就会持续的增长,所以注意观察队列状况,来决定是否添加新的消费者或者采取其他的相应策略。

    完整代码:

    NewTask.cs:

    using System;
    using RabbitMQ.Client;
    using System.Text;
    
    class NewTask
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
    
                var properties = channel.CreateBasicProperties();
                properties.SetPersistent(true);
    
                channel.BasicPublish(exchange: "",
                                     routingKey: "task_queue",
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }
    }
    View Code

    Worker.cs:

    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    using System.Threading;
    
    class Worker
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
    
                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);
    
                    Console.WriteLine(" [x] Done");
    
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "task_queue",
                                     noAck: false,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    View Code

    RabbitMQ客户API在线参考:RabbitMQ .NET client API reference online.

  • 相关阅读:
    数据持久化
    搜索样式键盘的Return按钮是否可点击
    活动指示器
    在本机上安装zabbix,来监控服务器 二
    在本机上安装zabbix,来监控服务器 一
    关于cacti的相关问题
    xp_cmdshell 的开启和关闭
    写代码创建数据库,设置位置后比对用可视化界面创建数据库的区别
    随机0~N之间的整数
    关于MD5密码破解
  • 原文地址:https://www.cnblogs.com/grayguo/p/5320019.html
Copyright © 2011-2022 走看看