一、路由(Routing)
(使用 php-amqplib)
在前一篇教程里,我们建立了一个简单的日志系统,我们能够广播日志信息给多个接受者。
在这篇教程中我们将给它添加一个功能——我们将仅订阅一个消息的子集。例如,我们仅将致命错误消息发给日志文件(以节省磁盘空间),并且仍旧可以打印所有的日志消息到控制台。
二、绑定(Bindings)
在前面的例子里,我们已经建立了绑定,你可以回想一下如下的代码:
$channel->queue_bind($queue_name, 'logs');
一个绑定就是一个交换和一个队列关系,这可以简单的理解为:这个队列仅对来自这个交换的信息感兴趣。
绑定可以使用一个额外的routing_key参数。为了避免和$channel::basic_publish参数混淆,我们将称他为绑定键(binding key)。这是我们如何用一个键来建立一个绑定:
$binding_key = 'black';
$channel->queue_bind($queue_name, $exchange_name, $binding_key);
绑定键的意义依赖于交换的类型。我们使用前面的fanout交换,仅忽略它的值。
三、Direct 交换(Direct exchange)
我们的前面教程里的日志系统发送所有的消息给所有的消费者。我们想扩展这个系统,可以按照他们的严重程度来过滤消息。例如我们可能会想让正在写日志消息到磁盘的代码仅接收致命错误,而不浪费磁盘空间在警告或一般信息的日志消息上。
我们当时用了一个fanout交换,这个不能让我们有更多的灵活性——因为它仅能盲目的广播信息。
我们将使用给一个direct交换来代替它。在这个交换的背后有一个简单的路由算法——一条消息进入一个队列,这个队列的绑定键将精确匹配这个消息的路由键。
为了演示这个,请考虑下面的图示:
在这个图示里,我们可以看到direct交换“X”有两个绑定到它的队列。第一个队列用了“orange”这个绑定键,第二个有两个绑定,一个用“black”绑定键,另一个用“green”。
在这样的图示里,一条带有路由键“orange”的消息发给这个交换后将会被发给“Q1”队列。而带有“black”或者“green”路由键的消息将走向“Q2”队列。而其它所有消息将被舍弃。
四、多重绑定(Direct exchange)
用同一个绑定键来绑定多个队列也是完全可以的。在我们的例子里我们也可以用“black”这个键在“X”和“Q1”之间添加一个绑定。在那种情况下,direct交换将会像fanout一样发送消息给所有匹配的队列,即一条带有“black”路由键的消息将被传给“Q1”和“Q2”两个队列。
五、发送日志(Emitting logs)
我们将这个模式应用于我们的日志系统。我们将用“direct”交换代替“fanout”交换来发送消息。我们以日志消息的严重程度作为它的路由键,在这种方式下,接收程序将能够选择它想接收的严重程度来接收。首先让我们来看日志消息的发送。
跟往常一样,我们首先需要建立一个交换:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
然后我们准备发送一条消息:
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->basic_publish($msg, 'direct_logs', $severity);
为了简化程序,我们假设“严重程度”仅为“消息(info)”, “警告(warning)”, “错误(error)”三种之一。
六、订阅(Subscribing)
目前收到的信息将像前面教程中讲述的一样工作,不同的是,我们将为我们关注的每一个严重程度建立一个新的绑定。
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
七、合在一起(Putting it all together)
“emit_log_direct.php” 类的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo ' [x] Sent ', $severity, ':', $data, "
";
$channel->close();
$connection->close();
“receive_logs_direct.php”的代码如下:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]
");
exit(1);
}
foreach ($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo " [*] Waiting for logs. To exit press CTRL+C
";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "
";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
如果你想仅保存“警告”和“错误”(不保存“信息”)日志信息到一个文件,敲入以下代码即可
php receive_logs_direct.php warning error > logs_from_rabbit.log
如果你想看所有的日志信息,打开一个新的控制台输入以下代码:
php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C
例如:发送一个错误日志,输入如下:
php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(完整代码请查看 emit_log_direct.php source,receive_logs_direct.php source)
下一篇教程,看一看怎样基于一种模式来监听消息。
原文:https://www.rabbitmq.com/tutorials/tutorial-four-php.html