Direct和Topic两种模式类似Mysql语言中精确和模糊查询,在Topic模式下有两个特殊字符,类似MySQL “%” 字符
* (星号) 代表任意 一个单词
# (井号) 0个或者多个单词
Topic模式可以很好的用于多维度场景。一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。
现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化
-
RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。
-
RoutingKey为“red.critical.high”的日志会只投递到queue2。
-
RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。
测试结果
生产者:topic.php
<?php // 生产者 p.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_topic'; $routeKey1 = "black.critical.high"; $routeKey2 = "red.critical.high"; $routeKey3 = "white.critical.high"; $message1 = 'black-critical-high!'; $message2 = 'red-critical-high!'; $message3 = 'white-critical-high!'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); $exchange->publish($message1, $routeKey1); $exchange->publish($message2, $routeKey2); $exchange->publish($message3, $routeKey3);
消费者:c_topic1.php
<?php // 消费者 c.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_topic'; $queueName = 'log_1'; $routeKey = 'black.critical.high'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // ACK 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
消费者:c_topic2.php
<?php // 消费者 c.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_topic'; $queueName = 'log2'; $routeKey = '#.high'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // ACK 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
消费者:c_topic3.php
<?php // 消费者 c.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_topic'; $queueName = 'log3'; $routeKey = 'white.critical.*'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // ACK 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }