- 启动rabbitmq-server
rabbitmq-server start
- 代码示例
exchange: e_1
queue: order
生产者 p.php
<?php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_1'; $queueName = 'q_1'; $routeKey = 'order'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 向服务器队列推送10条消息 for ($i = 0; $i < 10; $i++) { $msg = 'hello world'; $exchange->publish($msg, $routeKey); }
消费者 c.php
<?php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_1'; $queueName = 'q_1'; $routeKey = 'order'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; }
- 重开命令窗口,运行 php c.php, 生产者保持运行监听服务器消息推送
php c.php
- 运行生产者
php p.php
或
浏览器访问 http://localhost/项目名称/p.php
- 返回到消费者串口,看到成功接收到消息,并输出到屏幕