zoukankan      html  css  js  c++  java
  • PHP RabbitMQ 教程(三)

    发布/订阅

    我们在上一节创建了一个工作队列,并假定队列对应的任务传送给了某个客户端。在这一章节我们会做一些完全不一样的东西–我们会发送一条消息到多个消费者,也称之为“发布/订阅”模式。

    为了说明这个模式,我们会创建一个简单的日志系统(logging system,以下简称日志系统),它由两个程序组成–第一个是发送日志信息,第二个是接收日志并打印。

    日志系统的每一个运行的接收端程序都会接收信息,这样就可以运行一个接收端就把日志保存到硬盘里,同时运行另一个接收端去实时显示日志到屏幕。

    本质上,日志内容是广播给所有的接收端的。

    交换器

    在之前的章节中我们从一个队列里发送和接收消息,现在该把完整的RabbitMQ消息模型介绍给大家了。

    让我们快速的回看一遍在之前的章节中的内容:

    >生产者是一个用来发送消息的程序
    
    >队列是一个存储消息的缓冲区
    
    >消费者是一个接收消息的程序
    

    RabbitMQ消息模型的核心思想是,生产者永远不会直接发送给任何消息队列,实际上,生产者一般情况下甚至不知道消息应该发送给哪个队列。

    生产者只能发送消息到交换器中,交换器非常简单。一方面从生产者接收消息,另一方面把消息推送到队列中。交换器必须知道如何处理接收到的消息,是推送到某个队列?推送到多个队列?还是丢弃这条消息。这个规则通过交换器类型(exchange type)来指定。

    这里是交换器的几个类型:direct,topic,headers,fanout。这里我们主要关注最后一个–fanout,创建一个类型为 fanout 的交换器,命名为 logs。

    $channel->exchange_declare('logs','fanout',false,false,false);
    

    fanout交换器非常简单,你可以从名称中猜出它的功能,它把所有接收到的消息广播给所有它知道的队列,这也正是我们的日志系统需要的功能。

    列出交换器

    可以使用rabbitmqctl 命令列出服务器上的所有交换器:

    sudo rabbitmqctl list_exchanges
    
    Listing exchanges ...
            direct
    amq.direct      direct
    amq.fanout      fanout
    amq.headers     headers
    amq.match       headers
    amq.rabbitmq.log        topic
    amq.rabbitmq.trace      topic
    amq.topic       topic
    logs    fanout
    ...done.
    

    结果中有一些amq.*和一些未命名的交换器,这是一些默认创建的交换器,它们不太可能是现在需要用到的。

    未命名交换器

    在之前的章节中我们对交换器一无所知,直到可以发送消息给队列。大概是因为我们当时正在使用一个以空字符串“”定义的默认的交换器。

    回想一下之前怎么发布消息:

    $channel->basic_publish($msg,'','hello');
    

    这里就是使用默认或者说未命名的交换器:消息被routing_key的值 Here we use the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists. The routing key is the second argument to basic_publish

    现在,可以发布消息到这个队列。

    $channel->exchange_declare('logs','fanout',false,false,false);
    $channel->basic_publish($msg,'logs');
    

    临时队列

    也许你还记得在之前我们使用了一个指定的队列(还记得 hello 队列 和 task_queue 队列吗?)。可以命名一个队列是至关重要的–我们需要指定一个worker到同一个队列。当想让生产者和消费者使用同一个队列时给队列命名是非常重要的。

    但是在我们的日志系统中情况不同了,我们想要接收所有的消息,不仅仅是其中的一部分,我们关心的是最新的消息而不是旧的,因此需要做两件事。

    首先,当连接到RabbitMQ时,需要一个空的队列,可以手动创建一个名字随机的队列,或者,更好的办法是,让服务器为我们随机选一个队列名字。

    其次,一旦与消费者失去连接,队列需要自动删除。大专栏  PHP RabbitMQ 教程(三)p>

    php-amqplib中,当我们创建了一个名字为空的队列时,实际上是创建了一个被生成了名字的非持久化的队列。

    list($queue_name, ,) = $channel->queue_declare("");
    

    方法执行后,$queue_name变量包含了一个RabbitMQ生成的字符串。比如也许是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    当连接被关闭的时候,队列也会被删掉,因为队列是独有的。

    绑定(Bindlings)

    我们已经创建了一个fanout类型的交换器和一个队列。现在需要让交换器发送消息给队列。交换器和队列之间的关系称之为绑定(binding)

    $channel->queue_bind($queue_name,'logs');
    

    现在开始,logs 交换器会把消息附加到队列中。

    列出绑定(Listing bindings)

    可以使用 rabbitmqctl list_bindings列出所有存在的正在使用的绑定。

    整合

    发送日志消息的生产者,与之前的代码看起来没什么不同,最重要的变化是现在想要发送消息到我们的 logs 交换器中,需要在发送时提供一个routing_key,但是在 fanout类型的交换器中这个值是可以忽略的。下边是emit_log.php的代码。

    <?php
    
    require_once __DIR__ .'/verdor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
    $channel = $channel->channel();
    
    $channel->exchange_declare('logs','fanout',false,false,false);
    
    $data = implode(' ',array_slice($argv,1));
    
    if(empty($data)) $data = "info:Hello World";
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg,'logs');
    
    echo "[x]Sent ",$data,"n";
    
    $channel->close();
    $connection->close();
    ?>
    

    (emit_log.php)

    如你所见,建立连接后声明了交换器,这一步是必须的,因为发送消息到一个不存在的交换器是被禁止的。

    如果还没有队列绑定到交换器,信息会丢失,但是这对于我们是可以的,如果没有消费者监听,我们可以安全的丢弃消息。

    receive_logs.php:

    <?php
    
    require_once __DIR__ .'/vendor/autoload.php';
    use PhpAmqpLibConnectionQMAPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
    $channel = $connection->channel();
    
    $channel->queue_bind($queue_name,'logs');
    
    echo '[*] Waiting for logs. To exit press CTRL+C',"n";
    
    $callback = function($msg){
    	echo '[x]',$msg->body,"n";
    };
    
    $channel->basic_consume($queue_name,'',false,true,false,false,$callback);
    
    whild(count($channel->callbacks)){
    	$channel->wait();
    }
    
    $channel->close();
    $connection->close();
    ?>
    

    (receive_logs.php)

    如果想保存日志到文件中,可以在命令中输入

    php receive_logs.php > logs_from_rabbit.log
    

    如果想在屏幕上查看日志,新打开一个终端并运行:

    php receive_logs.php
    

    发送日志:

    php emit_log.php
    

    使用 rabbitmqctl list_bindings 可以确认代码确实创建了绑定和队列,当两个receive_logs.php在运行的时候会看到类似这样的:

    sudo rabbitmqctl list_bindings
    Listing bindings ...
    logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
    logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
    ...done.
    
    

    对于结果的解释很简单,logs交换器中的数据发送到两个服务器指定的队列,而这正是我们要实现的。

    想要弄明白怎样去监听部分消息,转到第四部分。

    原文地址:Publish/Subscribe

  • 相关阅读:
    Yarn安装与配置
    如何找到所有 HTML select 标签的选中项
    Jquery 获取某个样式除第一个以外的元素
    常用的字符串截取方法
    App的埋点测试
    手写 promise
    关于JSON.parse(JSON.stringify(obj))实现深拷贝应该注意的坑
    计算一个数组的深度
    数组的 交集 差集 补集 并集
    webpack -- element-ui 的按需引入
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12099814.html
Copyright © 2011-2022 走看看