在上次学xattr的时候用它简单实现一个中间件,我去了解了一下rabbitmq这个消息中间件,感觉理论上还是挺好用的,给一般并发量的系统用足够了。
首先安装这个服务。
sudo apt search rabbitmq
发现了这个
rabbitmq-server/focal-updates,focal-updates,focal-security,focal-security,now 3.8.2-0ubuntu1.3 all AMQP server written in Erlang
好,然后安装它
sudo apt-get install rabbitmq-server
等他跑完就行了,要用PHP调用的话需要这个php组件
composer require php-amqplib/php-amqplib
跑完后创建两个文件 receive.php 和 send.php
receive.php
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] waiting for message. ctrl+c to stop', " "; $callback = function($msg) { echo " [x] Received " . $msg->body . " "; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
send.php
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; $start_time = microtime(true); $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage('Hello world!'); $channel->basic_publish($msg, '', 'hello'); // echo " [x] Send 'Hello world' "; $channel->close(); $connection->close(); $end_time = microtime(true); echo "use time " . ($end_time - $start_time) * 1000 . " ms ";
先运行 php receive.php 它就一直在监听消息
[*] waiting for message. ctrl+c to stop
再运行 php send.php 进行投递消息
就可以看到效果了。
[*] waiting for message. ctrl+c to stop [x] Received Hello world!
关于这个例子 这篇文章介绍了很详细 https://www.rabbitmq.com/tutorials/tutorial-one-php.html
git仓库在这里 https://github.com/php-amqplib/php-amqplib
除了这个composer之外 还可以安装PHP的扩展
sudo apt-get install php7.4-amqp
安装后用 php -m |grep amqp 来验证一下扩展是不是真的安装上了。
如果我不运行receive.php 直接运行send.php会怎么样呢?
我发现它会把消息堆积放在中间介质里面(当然这个应该是rabbitmq server服务里面的一部分)
只运行 send.php 不运行receive.php 发现有消息堆积 我们可以用 sudo rabbitmqctl list_queues 来查看列表
sudo rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages hello 3
我执行了send.php3次留下来3条消息,在仓库列表里面等待处理。
此时再运行receive.php 然后进来就有消息输出 再看看列表,发现没有消息堆积了
sudo rabbitmqctl list_queues Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages hello 0
这说明生产者是可以随时生产投递到中间存储的,消费者启动的时候会自己去消费堆积任务。
如果生产者消费者都在运行,那么生产者投递的消息马上就被消费者处理了。
当你运行两个receive.php的话
然后让 send.php 执行10个消息投递
for ($i=0; $i < 10; $i++) { $msg = new AMQPMessage('Hello world!'.$i); $channel->basic_publish($msg, '', 'hello'); }
1个消费者打印
[*] waiting for message. ctrl+c to stop [x] Received Hello world!0 [x] Received Hello world!2 [x] Received Hello world!4 [x] Received Hello world!6 [x] Received Hello world!8
另1个消费者打印
[*] waiting for message. ctrl+c to stop [x] Received Hello world!1 [x] Received Hello world!3 [x] Received Hello world!5 [x] Received Hello world!7 [x] Received Hello world!9
看来它俩分配了消息
然后对于这个效率的话,我做了一个测试
投递 1 万条消息耗费 228 ms 合约 43859 条/秒
投递 10 万条消息耗费 1512 ms 合约 66137 条/秒
投递 100 万条消息耗费 18785 ms 合约 54990 条/秒
for ($i=0; $i < 10000; $i++) { $msg = new AMQPMessage('Hello world!'.$i); $channel->basic_publish($msg, '', 'hello'); } for ($i=0; $i < 100000; $i++) { $msg = new AMQPMessage('Hello world!'.$i); $channel->basic_publish($msg, '', 'hello'); } for ($i=0; $i < 1000000; $i++) { $msg = new AMQPMessage('Hello world!'.$i); $channel->basic_publish($msg, '', 'hello'); }
可以看出这个投递极限也就是单个进程每秒5-6万条左右,总体而言效率还算不错,已经能满足很多系统了,毕竟业务上也没有那么多消息要传递,如果非要更快,那就需要考虑用更快的xattr或者共享内存之类的了。