代码示例
生产者 p.php
<?php // 生产者 p.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_1'; $queueName = 'q_1'; $routeKey = 'order'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 向服务器队列推送10条消息 for ($i = 0; $i < 10; $i++) { $msg = 'hello world ' . $i; $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]); }
消费者 c.php
<?php // 消费者 c.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_1'; $queueName = 'q_1'; $routeKey = 'order'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
学习ack机制前,先了解ack机制使用和不使用两种情况下区别,这里我们在生产者中向服务器循环推送10条消息
消费者使用ack机制
// 接收消息并处理回调 $queue->consume('receive'); // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
不使用ack机制
通过对比两种执行结果,我们可以得知,服务器会连续推送3个消息,没有收到消费者返回时,停止向该消费者推送消息
在RabbitMQ中有一个prefetch_count的概念,这个参数的意思是允许Consumer最多同时处理几个任务。我的版本的RabbitMQ默认这个参数是3,也就是说如果某一个Consumer在收到消息后没有发送ACK确认包,RabbitMQ就会任务Consumer还在处理任务,当有3个消息都没有发送ACK确认包时,RabbitMQ就不会再发送消息给该Consumer。
在控制台确实3条消息未确认,7条准备发送
如果Consumer数量很多或者希望每个Consumer同时只处理一个任务可以通过在Consumer中设置PrefetchCount来实现更加均匀的任务分发。
$channel = new AMQPChannel($connection); $channel->setPrefetchCount(1);
以上情况是只一个消费者,如果我们有两个相同的消费者订阅相同的队列,那么服务器又是如何将消息推送给两个消费者的呢?
复制出 c1.php、 c2.php两个消费者
c1.php 和 c2.php 都开启ACK消息确认机制,
控制台 显示有两个连接
执行结果
可以看出服务器是平均将任务发给两个生产者
第二种情况,假如c2.php出故障或者代码错误导致ACk机制失效。
下面关掉c2.php脚本,这样c2.php和服务器断开连接
可以得知 c2.php 没有返回消息确认时,这三个消息是Unacked状态,其他7个消息由c1.php成功执行时,当c2.php断开连接时,服务器将Unacked消息交由c1.php完成。