一、工作队列
(使用 php-amqplib)
在第一篇教程中我们写程序从一个命名队列中发送和接收消息。在这篇中,我们将建立一个在多个工作者之间用于分发耗时任务的工作队列。
工作队列(也称为:任务队列)背后的主要思想是避免立即做一项资源密集型任务并且不得不等候它完成。而是我们计划这个任务在稍后被完成。我们封装一个任务为一条消息并且发送它到一个队列。一个在后台运行的工作进程将立即获取这个任务并最终执行它。当你运行多个工作进程时,任务将在它们之间被分配。
这个概念在web应用中尤其有用,在一个短HTTP请求窗口中,处理一项复杂的任务是不太可能的。
准备
在教程的前一篇中,我们发送一条包含“Hello World!”的消息。现在我们将发送字符串来代表复杂任务。我们没有一个真实的任务,类似于图片大小被调整或pdf文件被渲染,因此让我们来假装这个任务,通过伪装我们正忙——利用sleep()函数。我们将用字符串中的逗号数量作为它的复杂性。每个逗号将占用一秒的工作。一项被描述为Hello...的假装的任务将占用三秒钟。
我们稍微修改一下我们先前例子send.php的代码,允许从命令行发送任意的消息。这个程序将把任务发送到我们的工作队列中,所以我们命名它为new_task.php:
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent ", $data, "
";
我们旧的receive.php脚本也需要一些改变:它需要伪装在消息体内每一个逗号有一秒钟的工作。它将从队列里获取信息并且执行任务,所以我们称它为worker.php:
$callback = function($msg){
echo " [x] Received ", $msg->body, "
";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "
";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
注意我们的伪装任务模拟执行时间。
像在第一篇教程讲述的一样运行它们:
# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."
二、循环调度
使用任务队列的优点之一就是能够并行工作。如果我们正在建立一项积压的工作,我们只要添加更多的工作者就可以轻松地扩大规模。
首先,让我们试着同时运行两个worker.php脚本。这两个都将得到来自队列里的信息,但具体怎样?让我们看看。
你需要打开三个控制台程序。两个将运行worker.php程序。这些控制台程序将使我们的两个消费者——C1和C2。
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台里,我们将发布新的任务。一旦你启动消费者你就能发布一些信息了:
# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....
让我们看看有什么被传给了我们的工作者:
# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
默认情况下,RabbitMQ将依次发送每一条消息到下一个消费者,平均每个消费者将得到相同数量的消息。这种分发消息的方式就被称为轮询。试试用三个或更多的工作者。
三、消息确认
做一项任务会花几秒钟时间。你可能会想如果其中的一个消费者执行一项长时间的任务而只执行了一部分就死掉了那会怎样。在我们目前的代码中,一旦RabbitMQ传递一条消息给消费者后,它立即就会标记这条消息为删除。在这种情况下,如果你结束掉一个工作者,我们将丢失它正处理的消息。我们也会丢失被分发到这个工作者而尚未被处理的所有消息。
但是我们不想丢失任何任务。如果一个工作者死掉了,我们会想让这个任务交给另一个工作者。
为了确保一条消息不会丢失,RabbitMQ支持消息确认机制。一个ack(acknowledgement)被消费者发回以告知RabbitMQ一条特定的消息已被接收并处理,RabbitMQ可以删除它了。
如果一个消费者死掉(它的通道被关闭,连接也被关闭,或者TCP连接丢失)没有发送一个ack,RabbitMQ就会知道,一条消息没有被完全处理,则会将这条消息将重新排入队列。如果有其它的消费者同时在线,那么它将会迅速的重新传递这条消息给另一个消费者。这样你就可以确信没有消息被丢失,即使工作者偶尔死掉。
如果没有任何消息超时,当消费者死掉时,RabbitMQ将重新传送消息。这样即使处理一条消息要花很长很长时间也没事。
消息确认机制默认是被关闭的。现在是时间将它们设置为打开了,通过设置 basic_consume的第四个参数为false(true是不ack),然后,一旦我们完成一项任务就从工作者发送一个合适的确认。
$callback = function($msg){
echo " [x] Received ", $msg->body, "
";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "
";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
使用这些代码我们能确定即使在一个工作者正在处理一条消息时,你用CTRL+C结束掉这个进程,也没有什么丢失。这个工作者死掉后的不久所有未应答的消息将会重新被投递。
被忘记的确认
忘记确认是一个常见的错误。虽然这是一个简单的错误,但是后果是严重的。当你的客户端退出的时候消息将会被重新投递(这可能看起来像是随机的重新投递),但是因为不能释放任何未被确认的消息,RabbitMQ将会消耗越来越多的内存。
为了调试这种类型的错误,你可以使用rabbitmqctl来打印 messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在Windows系统上,去掉sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
四、消息持久化
我们已经学习了怎样确保即使消费者死亡任务也不会丢失。但是如果RabbitMQ服务停止了,我们的任务将仍然会被丢失。
当RabbitMQ退出或崩溃时它将忘记队列和消息除非你让它不要这么做。要确保消息不被丢失,有两件事被要求去做:我们需要标记队列和消息为持久的。
第一,我们需要确保RabbitMQ不会丢失我们的队列。为了这样,我们需要定义它为持久的。要这么做,我们就需要传递第三个参数为true到queue_declare:
$channel->queue_declare('hello', false, true, false, false);
尽管这条命令本身是正确的,但是在我们目前的设置中是不起作用的。这是因为我们已经定义了一个叫做hello的而不是持久的队列。RabbitMQ不允许你用不同的参数重新定义一个已经存在的队列,这将返回一个错误到任何这么做的程序中。但是有一个快速的解决办法——我们来用不同的名字定义一个队列,例如task_queue:
$channel->queue_declare('task_queue', false, true, false, false);
这个设置为true的标志需要应用到生产者和消费者代码中。
这时我们就能确信即使RabbitMQ重新启动,task_queue队列也不会被丢失。现在我们需要标记我们的消息为持久的——通过设置deliver_mode = 2消息属性,用作AMQPMessage属性数组的一部分。
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
关于消息持久的注意事项
标记消息为持久也不能完全保证消息不被丢失。尽管这使得RabbitMQ保存消息懂啊磁盘,也仍然会有一个间隙窗口当RabbitMQ已经接收一条消息而尚未保存它时。再者,RabbitMQ不会为每一条消息调用fsync(2) ——它可能仅被保存在缓存中而不是真的写到磁盘上。虽然持久的保证不是很强,但是对于我们的简单任务队列已经是足够了。如果你需要一个更强的保证,那么你可以使用 publisher confirms。
五、公平调度
你大概已经注意到了调度仍然不像我们想的那样工作。例如,在有两个工作者的情况下,当所有的奇数消息是重量级的而偶数消息是轻量级的,一个工作者将一直处于繁忙状态而另一个将几乎没有任何工作。对于这种情况RabbitMQ一无所知,仍然均匀地分发消息。
发生这种情况是因为RabbitMQ只在消息进入队列时才调度消息。它不看一个消费者未确认的消息数量。它仅仅盲目地分发每一个第n条消息到第n个消费者。
为了避免这种情况,我们可以使用设置prefetch_count = 1 的basic_qos方法 。这会让RabbitMQ不会一次去分配多余一条消息给工作者。或者,换句话说,不分发一条新的消息给一个工作者直到这个工作者已经处理完并且确认了前一条消息。转而分发消息到下一个不忙的消费者。
$channel->basic_qos(null, 1, null);
关于队列大小的注意事项
如果所有工作者都很忙,你的队列可以填满。你会想要关注这个,可能添加更多的工作者,或者有一些其它策略。
六、合在一起
我们的new_task.php文件的最终代码:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "
";
$channel->close();
$connection->close();
?>
我们的worker.php文件:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "
";
$callback = function($msg){
echo " [x] Received ", $msg->body, "
";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "
";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
使用消息确认和预取你能设置一个工作队列。持久选项让任务存活即使RabbitMQ被重启。
现在我们能够前往下一篇文章,学习怎样传递相同的消息给多个消费者。