RabbitMQ的关键字说明
(1)Broker:经纪人。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。
(2)Exchange:消息交换机。指定消息按照什么规则路由到哪个队列Queue。
(3)Queue:消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
(4)Binding:绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
(5)RoutingKey:路由关键字。Exchange根据RoutingKey进行消息投递。
(6)Vhost:虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。
(7)Producer:消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
(8)Consumer:消息消费者。消息的接收者,一般是独立的程序。
(9)Channel:消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。
RabbitMQ的使用流程
(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好Binding关系。
(5)生产者客户端投递消息到exchange。
(6)exchange接收到消息后,就根据消息的RoutingKey和已经设置的binding,进行消息路由(投递),将消息投递到一个或多个队列里。
(7)消费者客户端从对应的队列中获取并处理消息。
原理流程
生产者主要做的是:创建连接-->创建channel-->创建交换机对象-->发送消息
消费者主要做的是:创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息
demo1:
创建mysql表
CREATE TABLE `rabbitmq_table` ( `id` int(11) NOT NULL AUTO_INCREMENT, `sJson` varchar(255) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4;
consumer1.php
<?php /* * 创建表数据 CREATE TABLE `rabbitmq_table` ( `id` int(11) NOT NULL AUTO_INCREMENT, `sJson` varchar(255) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4; */ $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_2'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 //$ex->declare(); $ex->declareExchange(); //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 //$q->declare(); //最好队列object在这里declare()下,否则如果是新的queue会报错 $q->declareQueue(); //最好队列object在这里declare()下,否则如果是新的queue会报错 //绑定交换机与队列,并指定路由键,可以多个路由键 $q->bind($e_name, 'key_1'); echo "Message:".PHP_EOL; //阻塞模式接收消息 $q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 //非阻塞模式接收消息 /*while(True){ //消息获取 $arr = $q->get(); $res = $q->ack($arr->getDeliveryTag()); //手动发送ACK应答 $msg = $arr->getBody(); mysql_insert($msg); log_insert($msg); }*/ $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { var_dump($envelope->getRoutingKey()); $msg = $envelope->getBody(); mysql_insert($msg); log_insert($msg); } //插入日志 function log_insert($json){ file_put_contents('./consumer1.txt',$json.PHP_EOL,FILE_APPEND ); } //消息入库 function mysql_insert($json){ //连接MySQL数据库 $pdo = new PDO("mysql:host=localhost;dbname=test","root","168168" ); $pdo->query('SET NAMES UTF8MB4');//设置UTF8字符编码 // $pdo->query('SET NAMES UTF8'); $sql = "insert into `rabbitmq_table` (sJson) values ('{$json}')"; echo $sql.PHP_EOL; if ($pdo->exec($sql)){ echo "mysql insert success".PHP_EOL; } else { echo "mysql insert fail".PHP_EOL; } }
publisher1.php
<?php $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //创建交换机 $e_name = 'e_linvo'; //交换机名 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declareExchange().PHP_EOL; echo "Send Message:".$ex->publish("rabbitmq消息测试,key_1" . date('H:i:s', time()), 'key_1').PHP_EOL; //echo "Send Message:".$ex->publish("rabbitmq消息测试,key_2 by xust" . date('H:i:s', time()), 'key_2').PHP_EOL;
使用方法:先运行consumer1.php,再运行publisher1.php
运行效果:
demo2:
consumer2.php
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declareExchange()." "; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declareQueue()." "; //绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)." "; //阻塞模式接收消息 echo "Message:".PHP_EOL; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg.PHP_EOL; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }
publisher2.php
<?php $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交换机名 //$q_name = 'q_linvo'; //无需队列名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($conn); //消息内容 $message = json_encode(['msg'=>"RabbitMQ消息发送成功~~",'order_id'=>time()],JSON_UNESCAPED_UNICODE); //创建交换机对象 $ex = new AMQPExchange($channel); $ex->setName($e_name); //发送消息 //$channel->startTransaction(); //开始事务 for($i=1; $i<=5; ++$i){ echo "Send Message:".$ex->publish($message.$i, $k_route).PHP_EOL; sleep(1); } //$channel->commitTransaction(); //提交事务 $conn->disconnect();
使用方法:先运行consumer2.php,再运行publisher2.php
运行效果: