zoukankan      html  css  js  c++  java
  • 初识rabbitmq

    rabbitmq做为现在最流行的消息中间件之一,我竟然还没用过,不可原谅,所以自己查看下资料在本地自己用一下

    介绍:
    RabbitMQ 是一个在AMQP基础上实现的企业级消息系统。它接受并转发消息。你可以将其视为邮局:当你将要发布的邮件放在邮箱中时,您可以确信 Postman 先生最终会将邮件发送给收件人。在这个比喻中,RabbitMQ 是一个邮箱,邮局和邮递员,即一端不断往邮局中收取消息,而另一端则可以读取或者订阅队列中的发送消息。

    RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块的消息。

    常见术语:
    RabbitMQ 使用一些术语:生产者(publisher)、队列(queue)、消费者(consumer)。

    关系:
    生产者(业务系统 ,发送消息的程序)-->消息队列(类比一个邮箱,存在于RabbitMQ)--->消费者(队列处理系统,等待消息然后处理的程序)
    注意,队列类比一个邮箱,存在于RabbitMQ,然而信息流通过RabbitMQ和您的应用程序,他们只能存储在一个队列。队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。会有许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据。

    好处:

    在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步操作,而这种异步处理的方式大大的节省了服务器的请求时间,从而提高了系统的吞吐量。而且不影响服务器做其他相应,不独占服务器资源。

    如:注册用户这种服务,它可能解耦成好几种独立的服务(账号验证,邮箱验证码,手机短信码等)。它们作为消费者,等待用户输入数据,在前台数据提交之后会经过分解并发送到各个服务所在的url,分发的那个角色就相当于生产者。消费者在获取数据时候有可能一次不能处理完,那么它们各自有一个请求队列,那就是内存缓冲区了。做这项工作的框架叫做消息队列。

    所以消息队列适用于以下几种经典场景:

    1).数据冗余时:如电商系统中的订单处理系统,传统处理模式是:下订单的时候,订单系统可能会调用库存系统的接口,这样两个系统之间存在一个严重依赖关系,如果库存系统宕机,那么整个流程都会受到影响。现在大多公司的处理方法是:引入消息队列,下完订单,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

    即使在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。

    2).解耦:对库存系统来说,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作。这样实现了两个系统间的解耦。

    3).流量削峰:流量剧增情况下,如抢红包、秒杀等。

    4).异步通讯:异步操作下的情况都适用。

    5).扩展性:如已经有了财务系统,后续需要配货系统,届时只需要配货系统订阅消息队列就可以了。

    6).排序保证:队列本身就可以做成单进单出的单线程的系统,从而保证数据按照顺序进行处理。

    可用于队列的几种介质:

    Mysql:可靠性高、易实现,速度慢

    Redis:速度快,单条大消息包时效率低

    消息系统:专业性强、可靠,学习成本高,如rabbitmq

    消息处理触发机制:

    死循环方式读取:易实现,故障时无法及时修复

    定时任务:压力均分,有处理量的上限

    守护进程:类似于PHP-FPM和PHP-CG,需要shell基础

    解耦案例:队列处理订单系统和配送系统

    订单系统接受用户订单->订单对列表(mysql)->配送系统+crontab(定时脚本shell)+标记配送结果

    流量削峰案例:Redis的List类型实现秒杀(基于内存,速度快;mysql须向硬盘写入,而且其他业务也使用mysql,瞬间的大流量会造成压力)

    秒杀业务程序(检查Redis已存放数据的长度,超出上限直接丢弃)->Redis->入库程序(因为秒杀时间短,所以死循环处理Redis的数据)+数据库

    简单实现:

    rabbitmq需要一系列依赖,具体安装步骤省略,可以查看另一篇博文  https://www.cnblogs.com/two-bees/p/10980951.html

    这里代码需要安装composer依赖

    文件夹下新建composer.json,并添加

    {
        "require": {
            "php-amqplib/php-amqplib": ">=2.6.1"
        }
    }

    执行composer install,就会自动在文件夹下生成vendor目录

    新建生产者send.php:

    <?php 
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $q_name = 'task_queues';
    $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin');
    $channel = $connection->channel();
    
    $channel->queue_declare($q_name, false, true, false, false);
    
    for($i=0;$i<10;$i++){
        sleep(1);
        $data = $i.' '.date('Y-m-d H:i:s',time());
        $msg = new AMQPMessage(
        $data,
        array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
    );
    
    $channel->basic_publish($msg, '', $q_name);
    
    echo 'Rabbitmq Send Message: ', $data, "
    ";
    }
    
    
    $channel->close();
    $connection->close();
    
    ?>

    新建消费者run.php:

    <?php 
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $q_name = 'task_queues';
    //创建连接和信道
    $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin');
    $channel = $connection->channel();
    
    //创建队列
    $channel->queue_declare($q_name, false, true, false, false);
    
    echo "Rabbitmq Waiting for messages. queue:".$q_name."
    ";
    
    $callback = function ($msg) {
        echo 'Received Message: '. $msg->body. "
    ";
        sleep(substr_count($msg->body, '.'));
        echo "Done 
    ";
        //确定此消息已经处理完成,否则不确认的话,queue会将此消息交给其他consumer处理哦
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    
    //同时最多处理1条信息
    $channel->basic_qos(null, 1, null);
    //回调
    $channel->basic_consume($q_name, '', false, false, false, false, $callback);
    
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    
    ?>

    打开三个xshell窗口:

    第一个消费者窗口:

    第二个消费者窗口:

    第三个生产者窗口:

    最好还是直接脚本执行,否则框架是很容易504的(我在我自己的框架就是这样)。

    脚本执行,sleep 1秒下会很清晰的模拟出消息队列。

    好了,初次使用就到这里了。

    附录:

     官网教程:https://www.rabbitmq.com/getstarted.html

  • 相关阅读:
    notepad++ 安装
    Git 安装
    C 字符串常量 数据类型
    vue路由传参query和params的区别
    mysql 在 centos 上安装,做成服务,且开机启动 步骤
    全网最详细Apache Kylin1.5安装(单节点)和测试案例 ---> 现在看来 kylin 需要 安装到Hadoop Master 节点上
    Kylin build cube step 2 报错(Kylin 安装在slave 节点上)
    Kylin build cube step 2 报错
    Kylin 环境 搭建 成功
    Kylin 环境 搭建 报错
  • 原文地址:https://www.cnblogs.com/two-bees/p/11015541.html
Copyright © 2011-2022 走看看