zoukankan      html  css  js  c++  java
  • rabbitmq消息中间件的初步探索

    在上次学xattr的时候用它简单实现一个中间件,我去了解了一下rabbitmq这个消息中间件,感觉理论上还是挺好用的,给一般并发量的系统用足够了。

    首先安装这个服务。

    sudo apt search rabbitmq

    发现了这个

    rabbitmq-server/focal-updates,focal-updates,focal-security,focal-security,now 3.8.2-0ubuntu1.3 all
        AMQP server written in Erlang

    好,然后安装它

    sudo apt-get install rabbitmq-server

    等他跑完就行了,要用PHP调用的话需要这个php组件

    composer require php-amqplib/php-amqplib

    跑完后创建两个文件 receive.php 和 send.php

    receive.php

    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('hello', false, false, false, false);
    echo ' [*] waiting for message. ctrl+c to stop', "
    ";
    $callback = function($msg) {
      echo " [x] Received " . $msg->body . "
    ";
    };
    $channel->basic_consume('hello', '', false, true, false, false, $callback);
    while (count($channel->callbacks)) {
      $channel->wait();
    }
    $channel->close();
    $connection->close();

    send.php

    <?php
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $start_time = microtime(true);
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('hello', false, false, false, false);
    $msg = new AMQPMessage('Hello world!');
    $channel->basic_publish($msg, '', 'hello');
    // echo " [x] Send 'Hello world'
    ";
    $channel->close();
    $connection->close();
    
    $end_time = microtime(true);
    
    echo "use time " . ($end_time - $start_time) * 1000 . " ms 
    ";

    先运行 php receive.php 它就一直在监听消息

    [*] waiting for message. ctrl+c to stop

    再运行 php send.php 进行投递消息

    就可以看到效果了。

     [*] waiting for message. ctrl+c to stop
     [x] Received Hello world!

    关于这个例子 这篇文章介绍了很详细 https://www.rabbitmq.com/tutorials/tutorial-one-php.html

    git仓库在这里 https://github.com/php-amqplib/php-amqplib

    除了这个composer之外 还可以安装PHP的扩展

    sudo apt-get install php7.4-amqp

    安装后用  php -m |grep amqp 来验证一下扩展是不是真的安装上了。

    如果我不运行receive.php 直接运行send.php会怎么样呢?

    我发现它会把消息堆积放在中间介质里面(当然这个应该是rabbitmq server服务里面的一部分)

    只运行 send.php 不运行receive.php 发现有消息堆积 我们可以用  sudo rabbitmqctl list_queues 来查看列表

    sudo rabbitmqctl list_queues
    Timeout: 60.0 seconds ...
    Listing queues for vhost / ...
    name messages
    hello 3

    我执行了send.php3次留下来3条消息,在仓库列表里面等待处理。

    此时再运行receive.php 然后进来就有消息输出 再看看列表,发现没有消息堆积了

    sudo rabbitmqctl list_queues
    Timeout: 60.0 seconds ...
    Listing queues for vhost / ...
    name messages
    hello 0

    这说明生产者是可以随时生产投递到中间存储的,消费者启动的时候会自己去消费堆积任务。
    如果生产者消费者都在运行,那么生产者投递的消息马上就被消费者处理了。

    当你运行两个receive.php的话

    然后让 send.php 执行10个消息投递

    for ($i=0; $i < 10; $i++) {
        $msg = new AMQPMessage('Hello world!'.$i);
        $channel->basic_publish($msg, '', 'hello');
    }

    1个消费者打印

    [*] waiting for message. ctrl+c to stop
    [x] Received Hello world!0
    [x] Received Hello world!2
    [x] Received Hello world!4
    [x] Received Hello world!6
    [x] Received Hello world!8

    另1个消费者打印

    [*] waiting for message. ctrl+c to stop
    [x] Received Hello world!1
    [x] Received Hello world!3
    [x] Received Hello world!5
    [x] Received Hello world!7
    [x] Received Hello world!9

    看来它俩分配了消息

    然后对于这个效率的话,我做了一个测试

    投递   1 万条消息耗费   228 ms 合约 43859 条/秒
    投递  10 万条消息耗费  1512 ms 合约 66137 条/秒
    投递 100 万条消息耗费 18785 ms 合约 54990 条/
    for ($i=0; $i < 10000; $i++) {
        $msg = new AMQPMessage('Hello world!'.$i);
        $channel->basic_publish($msg, '', 'hello');
    }
    for ($i=0; $i < 100000; $i++) {
        $msg = new AMQPMessage('Hello world!'.$i);
        $channel->basic_publish($msg, '', 'hello');
    }
    for ($i=0; $i < 1000000; $i++) {
        $msg = new AMQPMessage('Hello world!'.$i);
        $channel->basic_publish($msg, '', 'hello');
    }

    可以看出这个投递极限也就是单个进程每秒5-6万条左右,总体而言效率还算不错,已经能满足很多系统了,毕竟业务上也没有那么多消息要传递,如果非要更快,那就需要考虑用更快的xattr或者共享内存之类的了。

  • 相关阅读:
    Eclipse将引用了第三方jar包的Java项目打包成jar文件的两种方法
    【提供笔试面试题参考】阿里巴巴,淘宝技术部,针对21届应届生的内推机会
    谈谈我眼中的CSDN吧
    我的2014碎碎念—学习篇、实习篇、工作篇、生活篇
    推荐几款我一直在用的chrome插件(下)
    推荐几款我一直在用的chrome插件(上)
    逛自己的微博,回顾曾经的那个“我”
    Java五道输出易错题解析(避免小错误)
    程序员和产品经理是怎么互相看的?贬低还是赞扬?
    现实中如何评判路遥《人生》中的高加林?
  • 原文地址:https://www.cnblogs.com/lizhaoyao/p/15206176.html
Copyright © 2011-2022 走看看