- 使用场景:
有时我们会遇到这样的情况,多个功能模块都希望得到完整的消息数据。例如一个log的消息,一个我们希望输出在屏幕上实时监控,另外一个用户持久化日志。这时就可以使用fanout模式。fanout模式模式不像direct模式通过routingkey来进行匹配,而是会把消息发送到所以的已经绑定的队列中。
- 消费者
<?php // 生产者 p_fanout.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_fanout'; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 向服务器队列推送10条消息 for ($i = 0; $i < 10; $i++) { $msg = 'hello world ' . $i; $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]); }
- 消费者c1_fanout
<?php // 消费者 c1_fanout.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_fanout'; $queueName = 'q_fanout_1'; $routeKey = ''; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 // $queue->consume('receive'); //阻塞模式接收消息 echo "Message: "; while(True){ $queue->consume('receive'); //$queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // ACK 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
- 消费者c2_fanout
<?php // 消费者 c2_fanout.php //配置信息 $config = [ 'host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/' ]; $exchangeName = 'e_fanout'; $queueName = 'q_fanout_2'; $routeKey = ''; //创建连接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker! "); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘 $exchange->setFlags(AMQP_DURABLE); // 声明交换机 $exchange->declareExchange(); // 创建消息队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // 设置持久性 $queue->setFlags(AMQP_DURABLE); // 声明消息队列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息并处理回调 // $queue->consume('receive'); //阻塞模式接收消息 echo "Message: "; while(True){ $queue->consume('receive'); //$queue->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } // 处理回调的方法 function receive($envelop, $queue){ echo $envelop->getBody() . " "; // ACK 通知生产者任务完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }
- 运行结果
- 三个文件的执行顺序
先启动两个消费者,最后启动生产者,原因是fanout模式下,生产者不会创建消息队列,如果消费者没有创建,则消息没有队列可放。