zoukankan      html  css  js  c++  java
  • RabbitMQ 官方NET教程(二)【工作队列】

    这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。

    工作队列的主要任务是:避免立刻执行资源密集型任务和避免必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被这些工作进程共享执行。
    这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。

    准备

    在本教程的前面部分,我们发送了一个包含Hello World!的消息。 现在我们将发送代替复杂任务的字符串。 我们没有一个现实世界的任务,比如图像被调整大小,或者是要渲染的pdf文件,所以假设我们很忙 - 通过使用Thread.sleep()函数来假冒它。 我们将把字符串中的点数作为其复杂度; 每个点都将占“work”的一秒钟。 例如,由Hello...描述的假任务将需要三秒钟。
    我们将稍后从之前的例子中修改Send程序,以允许从命令行发送任意消息。 这个程序会将任务安排到我们的工作队列中,所以让我们命名为NewTask

    dotnet new console --name NewTask
    mv NewTask/Program.cs NewTask/NewTask.cs
    dotnet new console --name Worker
    mv Worker/Program.cs Worker/Worker.cs
    cd NewTask
    dotnet add package RabbitMQ.Client
    dotnet restore
    cd ../Worker
    dotnet add package RabbitMQ.Client
    dotnet restore
    
    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    
    channel.BasicPublish(exchange: "",
                         routingKey: "task_queue",
                         basicProperties: properties,
                         body: body);
    

    有些帮助从命令行参数获取消息:

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }
    

    我们的旧的Receive.cs脚本还需要一些更改:它需要为消息体中的每个点伪造一秒的工作时间。 它将处理RabbitMQ发送的消息并执行任务,因此我们将其复制到Worker项目并修改:

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);
    
        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);
    
        Console.WriteLine(" [x] Done");
    };
    channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
    

    我们假任务到模拟执行时间:

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);
    

    循环调度

    使用任务队列的优点之一是能够轻松地并行工作。 如果我们正在建立积压的工作,我们可以增加更多的工作者,这样可以轻松扩展。

    首先,我们同时尝试运行两个Worker实例。 他们都会从队列中获取消息,但是究竟如何? 让我们来看看。

    你需要三个控制台打开。 两个将运行Worker程序。 这些控制台将是我们两个消费者 - C1和C2。

    # shell 1
    cd Worker
    dotnet run
    # => [*] Waiting for messages. To exit press CTRL+C
    

    # shell 2
    cd Worker
    dotnet run
    # => [*] Waiting for messages. To exit press CTRL+C
    

    在第三个我们将发布新的任务。 一旦您已经开始使用消费者,您可以发布一些消息:

    # shell 3
    cd NewTask
    dotnet run "First message."
    dotnet run "Second message.."
    dotnet run "Third message..."
    dotnet run "Fourth message...."
    dotnet run "Fifth message....."
    

    让我们看看送给我们workers的内容:

    # shell 1
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'First message.'
    # => [x] Received 'Third message...'
    # => [x] Received 'Fifth message.....'
    
    # shell 2
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'Second message..'
    # => [x] Received 'Fourth message....'
    

    默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。 平均每个消费者将获得相同数量的消息。 这种分发消息的方式叫做循环(round-robin)。 与三名或更多的workers一起尝试。

    消息应答(message acknowledgments)

    执行一个任务需要花费几秒钟。你可能会担心当一个消费者在执行任务时发生中断。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其从内存中删除。在这种情况下,如果杀死正在执行任务的某个工作者,我们会丢失它正在处理的信息。我们也会丢失已经转发给这个工作者且它还未执行的消息。

    但是我们不想失去任何任务。如果一个worker挂了,我们希望把这个任务交给另一个工作者。

    为了确保消息永远不会丢失,RabbitMQ支持消息确认。从消费者发送一个确认信息告诉RabbitMQ已经收到,处理了特定的消息,然后RabbitMQ可以自由删除它。

    如果消费者死机(其通道关闭,连接关闭或TCP连接丢失),而不发送确认信息,RabbitMQ将会明白消息未被完全处理并重新排队。如果同时有其他消费者在线,则会迅速将其重新提供给另一个消费者。这样就可以确保没有消息丢失,即使工作者偶然死亡。

    没有任何消息超时; RabbitMQ将在消费者挂了时重新发送消息。如果消费者处理一个信息需要耗费特别特别长的时间是允许的。

    消息确认默认情况下打开。 在前面的例子中,我们通过将noAck(“no manual acks”)参数设置为true来明确地将其关闭。 一旦完成任务,现在该删除这个标志并发送正确的确认。

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        Console.WriteLine(" [x] Received {0}", message);
    
        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);
    
        Console.WriteLine(" [x] Done");
    
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    };
    channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
    

    使用这个代码,我们可以确定即使在处理消息时,使用CTRL + C杀死一个工作者,也不会丢失任何东西。工作者挂了之后不久,所有未确认的消息将被重新发送。

    忘记确认
    丢失BasicAck是一个常见的错误。 这是一个容易的错误,但后果是严重的。
    当您的客户端退出(可能看起来像随机重新传递)时,消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存,因为它将无法释放任何未包含的消息。

    为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:

    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
    

    在Windows上:

    rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
    

    消息持久化(Message durability)

    我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

    当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要丢失。需要两件事来确保消息不会丢失:我们需要将所有队列和消息标记为持久化。

    首先,我们需要确保RabbitMQ不会丢失我们的队列。 为了这样做,我们需要将其声明为持久的:

    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    

    虽然这个命令本身是正确的,但是在我们目前的设置中是不行的。 这是因为我们已经定义了一个非持久化的名为hello的队列。 RabbitMQ不允许您重新定义具有不同参数的现有队列,并会向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue

    channel.QueueDeclare(queue: "task_queue",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    

    这个queueDeclare更改需要应用于生产者和消费者代码。

    在这一点上,我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在我们需要将我们的消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true

    var properties = channel.CreateBasicProperties();
    properties.Persistent = true;
    

    注意消息持久性

     将消息标记为持久性不能完全保证消息不会丢失。 虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且还没有保存时,仍然有一个很短的时间窗口。 此外,RabbitMQ不会对每个消息执行`fsync`(同步内存中所有已修改的文件数据到储存设备) - 它可能只是保存到缓存中,而不是真正写入磁盘。 持久性保证不强,但对我们的简单任务队列来说已经足够了。 如果您需要更强大的保证,那么您可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。
    

    公平转发(Fair dispatch)

    或许会发现,目前的消息转发机制(Round-robin)并非是我们想要的。例如,这样一种情况,对于两个消费者,有一系列的任务,奇数任务特别耗时,而偶数任务却很轻松,这样造成一个消费者一直繁忙,另一个消费者却很快执行完任务后等待。
    造成这样的原因是因为RabbitMQ仅仅是当消息到达队列进行转发消息。并不在乎有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发所有的奇数给一个消费者,偶数给另一个消费者。

    channel.BasicQos(0, 1, false);
    

    注意队列大小

    如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
    

    完整的代码

    NewTask.cs 类:

    using System;
    using RabbitMQ.Client;
    using System.Text;
    
    class NewTask
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
    
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
    
                channel.BasicPublish(exchange: "",
                                     routingKey: "task_queue",
                                     basicProperties: properties,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }
    }
    

    Worker.cs类:

    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    using System.Threading;
    
    class Worker
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
    
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] Received {0}", message);
    
                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);
    
                    Console.WriteLine(" [x] Done");
    
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "task_queue",
                                     noAck: false,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    
  • 相关阅读:
    阿里消息队列中间件 RocketMQ 源码分析 —— Message 拉取与消费(上)
    数据库中间件 ShardingJDBC 源码分析 —— SQL 解析(三)之查询SQL
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 解析(六)之删除SQL
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 解析(五)之更新SQL
    消息队列中间件 RocketMQ 源码分析 —— Message 存储
    源码圈 300 胖友的书单整理
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 路由(一)分库分表配置
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 解析(四)之插入SQL
    数据库分库分表中间件 ShardingJDBC 源码分析 —— SQL 路由(二)之分库分表路由
    C#中Math类的用法
  • 原文地址:https://www.cnblogs.com/Wulex/p/6965057.html
Copyright © 2011-2022 走看看