publish and subscribe
前言
前面的例子 我们都是用到的都是消息单一消费,即一条消息被单个消费者消费。像微博系统的消息推送,是一条消息推送给所有订阅到该频道的用户。
这里我们就需要用到rabbitmq的发布与订阅(publish and subscribe)
原理
前面我们弱化rabbitmq,只抽象出了 生产者、队列、消费者三个概念。
现在需要介绍rabbitmq的整体数据流转过程。
数据由生产者发送给交换机,交换机接收数据并把它发送给与自己绑定好的队列,队列接收消息并且把它发送给消费者。
事实上,生产者根本不会知道消息是发送给谁的,也不需要关心。who cares?!
exchange的类型
移步 RabbitMQ各种交换机类型Exchange Types介绍
发布与订阅
一个中心生产者,多个消费者。
生产者生产消息给类型为fanout的exchange,多个queue与该exchange绑定,消费者从queue中获取消息。
代码
1. 生产者、消费者声明类型为fanout、名称为logs的交换机
2. 消费者进程声明名称随机的queue(用于每new 一个进程就会产生一个队列),将queue与logs exchange绑定
整体代码如下
fanout_sender.php
<?php /** * Created by PhpStorm. * User: wangdaxi * Date: 2017/10/20 * Time: 14:20 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); empty($data) && $data = 'Hello World'; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo " [x] Sent $data "; $channel->close(); $connection->close();
fanout_receive.php
<?php /** * Created by PhpStorm. * User: wangdaxi * Date: 2017/10/20 * Time: 14:32 */ require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo ' [*] Waiting for logs. To exit press CTRL+C', " "; $callback = function($msg){ echo " [x] " . $msg->body; }; //消费,关闭ack $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
测试
开启一个终端作为生产者P,两个消费者作为消费者C1,C2。
生产者生产消息,会发现每个消费者都会收到同样的消息。很简单,不上图。
以上。