1、安装拓展及工具包
2、代码实现
a、发布消息
<?php $_host = '127.0.0.1';//主机 $_port = '5672';//端口 $_login = 'TEST';//账号 $_password = '123123';//密码 $_vhost = 'TEST';//host默认为/ require_once '../../../../vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $exchange = 'TEST';//交换机 $connection = new AMQPStreamConnection($_host, $_port, $_login, $_password, $_vhost); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', true, false, false); $routerKey = 'lol_status'; //模拟批量发布消息 for ($i = 0; $i < 100; $i++) { $arr = [ 'match_id' => $i, 'status' => rand(0,3), 'time'=> date('Y-m-d H:i:s') ]; $data = json_encode($arr,JSON_UNESCAPED_UNICODE); $msg = new AMQPMessage($data); $channel->basic_publish($msg, $exchange, $routerKey); echo '发送 '.$routerKey.' 消息: ' . $data . PHP_EOL; } $channel->close(); $connection->close(); ?>
b、消费消息
<?php $_host = '127.0.0.1'; $_port = '5672'; $_login = 'TEST'; $_password = '123123'; $_vhost = 'TEST'; require_once '../../../../vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $exchange = 'TEST';//交换机名称 $queueName = 'lol';//队列名称 $routerKey = 'lol_status'; //路由key $connection = new AMQPStreamConnection($_host, $_port, $_login, $_password, $_vhost); $channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', true, false, false); $channel->queue_bind($queueName, $exchange, $routerKey); //获取消息数量信息 $declare_info = $channel->queue_declare($queueName,true);//$declare_info[1] 消息数量 echo " 等待消息中..." .PHP_EOL; $callback = function ($msg){ echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL; //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//手动应答 }; //第4个参数 false:手动应答 true:自动应答 $channel->basic_consume($queueName, '', false, true, false, false, $callback); //阻塞模式 while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close(); ?>