工作队列
在第一个教程中,我们编写了用于从命名队列发送和接收消息的程序。在这一个中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待完成。相反,我们安排稍后完成任务。我们把一个任务封装 成一个消息并发送给一个队列。在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享。
这个概念在Web应用程序中特别有用,在短的HTTP请求窗口中不可能处理复杂的任务。
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法(round-robin)。试试三个或更多的工人。
消息确认
做任务可能需要几秒钟的时间。你可能会想知道如果其中一个消费者开始一个长期的任务,并且只是部分完成而死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向客户发送消息,立即将其标记为删除。在这种情况下,如果你杀了一个工人,我们将失去刚刚处理的信息。我们也将失去所有派发给这个特定工作人员但尚未处理的消息。
但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交付给另一名工人。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回确认(告知)告诉RabbitMQ已经收到,处理了一个特定的消息,并且RabbitMQ可以自由删除它。
如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新排队。如果有其他消费者同时上网,则会迅速重新发送给其他消费者。这样,即使工人偶尔死亡,也可以确保没有任何信息丢失。
没有任何消息超时; 当消费者死亡时,RabbitMQ将重新传递消息。即使处理消息需要非常很长的时间也没关系。
手动消息确认默认打开。在前面的示例中,我们通过将autoAck(“自动确认模式”)参数设置为true来显式关闭它们。一旦我们完成了任务,现在是时候删除这个标志并且手动发送一个合适的确认信息给工作人员。
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
使用这段代码,我们可以确定,即使在处理消息的时候使用CTRL + C来杀死一个工作者,也不会丢失任何东西。工人死后不久,所有未确认的消息将被重新发送。
忘记确认
错过BasicAck是一个常见的错误。这是一个容易的错误,但后果是严重的。当你的客户退出时(这可能看起来像随机的重新传递),消息将被重新传递,但是RabbitMQ将会消耗越来越多的内存,因为它将不能释放任何未被消息的消息。
为了调试这种错误,你可以使用rabbitmqctl 打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,删除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你不告诉它。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。
首先,我们需要确保RabbitMQ永远不会失去队列。为了做到这一点,我们需要宣布它是持久的:
channel.QueueDeclare(queue: "hello", durable: true, //durable 持久化 定义一个队列 并且设置为持久性的消息 exclusive: false, //独占方式 autoDelete: false, //是否自动删除 arguments: null);
虽然这个命令本身是正确的,但是在我们目前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列 ,这个队列并不耐用。RabbitMQ不允许你使用不同的参数重新定义一个已经存在的队列,并且会向任何尝试这样做的程序返回一个错误。但有一个快速的解决方法 - 让我们声明一个不同名称的队列,例如task_queue:
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
这个queueDeclare更改需要应用于生产者和消费者代码。
此时我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久的 - 通过设置IBasicProperties.SetPersistent为true。
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
注意消息持久性
将邮件标记为“永久”并不能完全保证邮件不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接收到消息并且还没有保存消息时,仍然有一个很短的时间窗口。此外,RabbitMQ不会为每个消息执行fsync(2) - 它可能只是保存到缓存中,而不是写入磁盘。持久性保证不强,但对我们简单的任务队列来说已经足够了。如果您需要更强大的保证,那么您可以使用 发布商确认。
公平派遣
您可能已经注意到调度仍然不能按照我们的要求工作。例如在有两个工人的情况下,当所有奇怪的信息都很重,甚至信息很轻时,一个工作人员就会一直很忙,另一个工作人员几乎没有工作。那么,RabbitMQ不知道任何有关的信息,将仍然均匀地发送消息。
发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地把第n条消息分发给第n个消费者。
为了改变这种行为,我们可以使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ一次不能给一个工作者多个消息。或者换句话说,不要向工作人员发送新消息,直到处理并确认了前一个消息。相反,它会将其分派给下一个还不忙的工作人员。
channel.BasicQos(0, 1, false);
关于队列大小的说明
如果所有的工人都很忙,你的队伍就可以填满了。你会想要关注一下,也许会增加更多的工人,或者有其他的策略。
把它放在一起
我们的NewTask.cs类的最终代码:
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes("发送一个消息"); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
而我们的Worker.cs
public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
使用消息确认和BasicQ您可以设置一个工作队列。即使RabbitMQ重新启动,耐用性选项也能让任务继续存在。