介绍
一款消息队列数据库,类似redis发布订阅,但是rq 做了功能完善和数据持久化。在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
客户端 rq_client.php
//建立连接 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', ); $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } //建立信道和创建 exchange1 的交换机 $channel = new AMQPChannel($conn); $ex = new AMQPExchange($channel); $ex->setName("exchange1"); $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); $ex->declareExchange(); //发送信息到服务器端 路由键 key1 、key2 for ($i = 0; $i < 10; $i++) { $key = "key1"; if ($i % 2 == 0) { $key = "key2"; } $ex->publish("use rq send message " . $key . "--" . $i, $key); } echo "success";
服务器端 rq_service.php
$conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/', ); //建立连接 $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } //建立信道和创建队列 first $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName("first"); $q->setFlags(AMQP_DURABLE); //持久化 //队列绑定交换机 exchange1 并指定路由键 key1 $q->bind("exchange1", 'key1'); //阻塞模式接收消息 echo "Message: "; $q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg . " "; //处理消息 }
cmd下分别执行rq_service.php 、rq_client.php
交换机4种类型
AMQP_EX_TYPE_DIRECT 直连
AMQP_EX_TYPE_TOPIC 模糊匹配
AMQP_EX_TYPE_FANOUT 广播类型 路由键可不填
AMQP_EX_TYPE_HEADERS
路由键模糊匹配
发送消息的路由键不是固定的单词,而是匹配字符串,如"*.lu.#",*匹配一个单词,#匹配0个或多个单词。
$q->bind("exchange1", 'key.#'); 请用 "." 连接
持久化概念
指定exchange,并显式申明它的类型。
将exchange持久化,这样那怕Rabbit重启,exchange也不会消失。
将queue持久化,这样那怕Rabbit重启,queue也不会消失(包括queue中的消息)
在exchange publish消息时,指定 route key,同时在queue中绑定rout key,这样exchange在转发消息时,能够将消息转发到与route key匹配的的队列中。
应用场景
事件驱动类型,当客户端有消息发送,服务器端会接收消息。用于群发短信、存储数据(缓存层)减轻数据库压力。