zoukankan      html  css  js  c++  java
  • RabbitMQ译文 — 发布/订阅(Publish/Subscribe)

    一、发布/订阅

    (使用 php-amqplib)

      在前一篇教程中我们建立了一个工作队列。工作队列假设每个任务被精确的发送给一个工作者。在这部分,我们将做一些完全不同的事情——我们将发送一条消息给多个消费者。这种模式被称为“发布/订阅”。

      为了说明这种模式,我们将建立一个简单的日志系统。它将有两个程序组成——第一个将发送日志信息而第二个将接收并打印它们。

      在我们的日志系统中,每个正在运行的接受者程序的副本都将收到消息。那样我们就能运行一个接收者并将日志写入磁盘;与此同时我们将运行另一个接收者输出日志到屏幕。

      实际上,被发布的日志消息将被广播到所有的接收者。

    二、交换

      在前面的教程中我们发送消息到队列或从队列中获取消息。现在是时候介绍在Rabbit中完整的消息模型了。

      让我们快速重温一下我们在前面的教程中学到的知识:

    生产者:一个发送消息的用户应用。

    队列:一个存储消息的缓冲区。

    消费者:一个接收消息的用户应用。

      在RabbitMQ里消息模型的核心思想就是生产者绝不直接发送任何消息给队列。实际上,通常生产者甚至根本不知道一条消息是否将被传递到任何队列中。

      相反,生产者仅能发送消息到一个交换。交换是一个非常简单的事物。一方面它接收来自于生产者的消息,另一方面它推送消息给队列。交换必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列里么?它应该被添加到很多队列里么?还是它应该被丢弃。这种规则由交换类型来定义。

     

      有几个可用的交换类型:direct,topic,headers和fanout。我们将看最后一个——fanout。让我们建立一个这种类型的交换,并称之为logs:

    $channel->exchange_declare('logs', 'fanout', false, false, false);
    

      fanout交换非常简单。或许你能从名字中猜出来,它仅广播所有的它收到的消息到所有它知道的队列中。这正是我们的日志系统需要的。

    列出交换

      你可以运行永远都很有用的rabbitmqctl,列出服务器端的交换:

    sudo rabbitmqctl list_exchanges

      在这个列表里有一些amq.*交换和默认(未命名)交换。这些被默认建立,但是此时你还不可能会需要使用它们。

    默认交换

      在前面部分的教程中我们还不知道关于交换的任何事情,但是仍然能发送消息到队列。因为我们正在使用默认的交换,我们通过空字符串("")来指定,所以这是可能的。

      回想一下之前我们是如何发布一条消息的:

    $channel->basic_publish($msg, '', 'hello');
    

      这里我们使用默认或无名的交换:消息被发到由routing_key指定的队列中,如果它存在。路由键是basic_public的第三个参数

      现在,我们能够发布消息到我们命名的交换:

    $channel->exchange_declare('logs', 'fanout', false, false, false);
    $channel->basic_publish($msg, 'logs');
    

    三、临时队列

      也许你还记得先前我们使用的队列有一个指定的名字(记着hello和task_queue么?)。对于我们来说能命名一个队列是至关重要的——我们需要将工作者指给相同的队列。当你想要在生产者和消费者之间分享队列时,给队列一个名字是重要的。

      但是我们的日志系统不是这样的情况。我们想要监听所有的日志消息,而不仅是它们中的子集。我们对只对当前流转的消息感兴趣而不是旧的。解决这个我们需要两件东西。

      第一,无论何时我们连接到Rabbit,我们都需要一个新的,空的队列。做这个我们可以建立一个随机名字的队列,或者,更好的——让服务器为我们选一个随机的队列名。

      第二,一旦我们断开了消费者,队列应该自动被删除。

      在php-amqplib客户端,当我们提供队列名一个空字符串时,我们建立了一个非持久的,被生成名称的队列。

    list($queue_name, ,) = $channel->queue_declare("");
    

      当方法返回时,变量$queue_name包含一个被RabbitMQ生成的随机队列名。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

      当声明它的连接关闭时,队列将被删除,因为它被定义为排他的。你可以在队列向导中学习更多的关于排他标记和其它队列属性。

    四、绑定

      

      我们已经建立了一个fanout类型的交换和一个队列。现在我们需要让交换发送消息到我们的队列。交换队列之间的关系被称为绑定。

    $channel->queue_bind($queue_name, 'logs');
    

      从现在开始日志交换将添加消息到我们的队列。

    列出绑定

      你可以列出已存在的绑定,使用……,你猜到了

    rabbitmqctl list_bindings
    

    五、放在一起

      

      生产者程序,发送日志消息,看起来和前一篇教程中的没有多少不同。最重要的改变是我们现在想要发布消息到我们的日志交换而不是未命名的那个。这里给出emit_log.php的编码:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    $data = implode(' ', array_slice($argv, 1));
    if(empty($data)) $data = "info: Hello World!";
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg, 'logs');
    
    echo " [x] Sent ", $data, "
    ";
    
    $channel->close();
    $connection->close();
    
    ?>
    

       (emit_log.php source)

      正如你所看到的,在建立连接之后我们定义交换。这一步是必要的,因为不允许发送消息到一个不存在的交换。

      如果还没有队列绑定到交换,消息将丢失,但是这对我们没什么。如果还没有消费者在监听,我们可以安全的丢弃这条消息。

      receive_logs.php的代码:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('logs', 'fanout', false, false, false);
    
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
    $channel->queue_bind($queue_name, 'logs');
    
    echo ' [*] Waiting for logs. To exit press CTRL+C', "
    ";
    
    $callback = function($msg){
      echo ' [x] ', $msg->body, "
    ";
    };
    
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    
    ?>
    

       (receive_logs.php source)

      如果你想保存日志为一个文件,只需打开终端输入:

    php receive_logs.php > logs_from_rabbit.log
    

      如果你想在屏幕上看日志,打开一个新的终端并运行:

    php receive_logs.php
    

      当然,发送日志,输入:

    php emit_log.php
    

      使用rabbitmqctl list_bindings你可以验证代码实际上像我们想的那样建立了绑定和队列。运行两个receive_logs.php程序时,你应该看到如下所示:

    sudo rabbitmqctl list_bindings
    # => Listing bindings ...
    # => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
    # => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
    # => ...done.
    

      结果的解释是明确的:来自于交换的日志数据进入了两个由服务器分配的名字的队列。这正是我们想要的。

      要了解怎样监听消息的子集,让我们继续看第四课

      原文:http://www.rabbitmq.com/tutorials/tutorial-three-php.html

  • 相关阅读:
    Python基础知识2-内置数据结构(上)
    vs code的快捷方式
    vs code配置
    vs code安装
    web浏览器兼容问题
    js正則表達式
    MVC MVP MVVM
    vue-cli(vue脚手架)
    web攻擊
    web前端面試題
  • 原文地址:https://www.cnblogs.com/penrodsheh/p/8601834.html
Copyright © 2011-2022 走看看