2019年12月10日09:57:52
原文:https://www.rabbitmq.com/tutorials/tutorial-two-php.html
发布/订阅
(使用php-amqplib)
先决条件
本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行。如果您使用其他主机,端口或凭据,则连接设置需要进行调整。
在哪里获得帮助
如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。
在上一个教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都恰好交付给一个工人。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅”。
为了说明这种模式,我们将构建一个简单的日志记录系统。它由两个程序组成-第一个程序将发出日志消息,第二个程序将接收并打印它们。
在我们的日志系统中,接收器程序的每个运行副本都将获得消息。这样,我们将能够运行一个接收器并将日志定向到磁盘。同时我们将能够运行另一个接收器并在屏幕上查看日志。
本质上,已发布的日志消息将被广播到所有接收者。
交流交流
在本教程的前面部分中,我们向队列发送消息和从队列接收消息。现在是时候在Rabbit中引入完整的消息传递模型了。
让我们快速回顾一下先前教程中介绍的内容:
- 甲生产者是发送消息的用户的应用程序。
- 甲队列是一个缓冲区,用于存储消息。
- 甲消费者是接收消息的用户的应用程序。
RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。
相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。

有几种交换类型可用:direct,topic,headers 和fanout。我们将集中讨论最后一个-扇出。让我们创建这种类型的交换,并将其称为log:
$channel->exchange_declare('logs', 'fanout', false, false, false);
扇出交换非常简单。正如您可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。而这正是我们记录器所需要的。
交换队列
要列出服务器上的交换,您可以运行有用的rabbitmqctl:
sudo rabbitmqctl list_exchanges在此列表中,将有一些amq。*交换和默认(未命名)交换。这些是默认创建的,但是您现在不太可能需要使用它们。
默认交换
在本教程的前面部分中,我们对交换一无所知,但仍然能够将消息发送到队列。这是可能的,因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
回想一下我们之前如何发布消息:
$channel->basic_publish($msg, '', 'hello');在这里,我们使用默认或无名交换:消息将以routing_key指定的名称路由到队列(如果存在)。路由键是basic_publish的第三个参数
现在,我们可以改为发布到命名的交换队列:
$channel->exchange_declare('logs', 'fanout', false, false, false); $channel->basic_publish($msg, 'logs');
临时队列
您可能还记得,我们使用的是具有特定名称的队列(还记得hello和task_queue吗?)。能够命名队列对我们至关重要-我们需要将工人指向同一队列。当您想在生产者和消费者之间共享队列时,给队列命名很重要。
但这不是我们的记录器的情况。我们希望听到所有日志消息,而不仅仅是它们的一部分。我们也只对当前正在发送的消息感兴趣,而对旧消息不感兴趣。为了解决这个问题,我们需要两件事。
首先,无论何时连接到Rabbit,我们都需要一个全新的空队列。为此,我们可以创建一个具有随机名称的队列,或者甚至更好-让服务器为我们选择一个随机队列名称。
其次,一旦我们断开了使用者的连接,队列将被自动删除。
在php-amqplib客户端中,当我们以空字符串形式提供队列名称时,我们将使用生成的名称创建一个非持久队列:
list($queue_name, ,) = $channel->queue_declare("");
当方法返回时,$ queue_name变量包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
当声明它的连接关闭时,该队列将被删除,因为它被声明为独占。您可以在队列指南中了解有关排他标志和其他队列属性的更多信息。
绑定

我们已经创建了一个扇出交换和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换和队列之间的关系称为绑定。
$channel->queue_bind($queue_name, 'logs');
从现在开始,日志交换将消息添加到我们的队列中。
列表绑定
您可以使用猜测的方式列出现有的绑定,
rabbitmqctl list_bindings
放在一起

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是,我们现在希望将消息发布到日志交换器,而不是无名的消息交换器。这是emit_log.php脚本的代码 :
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "info: Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, " "; $channel->close(); $connection->close();
如您所见,建立连接后,我们声明了交换。由于禁止发布到不存在的交易所,因此此步骤是必需的。
如果没有队列绑定到交换,消息将丢失,但这对我们来说是可以的。如果没有消费者在听,我们可以安全地丢弃该消息。
receive_logs.php的代码:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for logs. To exit press CTRL+C "; $callback = function ($msg) { echo ' [x] ', $msg->body, " "; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
如果要将日志保存到文件,只需打开控制台并键入:
php receive_logs.php > logs_from_rabbit.log
如果希望在屏幕上查看日志,请生成一个新的终端并运行:
php receive_logs.php
当然,要发出日志,请输入:
php emit_log.php
使用rabbitmqctl list_bindings,您可以验证代码是否确实根据需要创建了绑定和队列。 运行两个receive_logs.php程序后,您应该会看到类似以下内容:
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
结果的解释很简单:交换日志中的数据进入两个具有服务器分配名称的队列。这正是我们的意图。