zoukankan      html  css  js  c++  java
  • RabbitMQ接入之PHP

    上一篇记录下RabbitMQ的安装与管理界面,接下来开始看PHP是如何接入的

    1.安装php-amqplib

    php-amqplib是一个纯PHP库,使用它,基于PHP的脚本客户端就可以轻松的连接和操作RabbitMQ。我们使用composer来安装。

    composer require php-amqplib/php-amqplib

    示例说明

    生产者(Producer)和消费者(Consumer)是消息队列的基本概念,生产者是指生产消息的一方,也是消息发送方,消费者就是消费消息的一方,也是消息接收方,队列就是存储消息的一个缓存区。

    本实例将由生产者发送很多消息给消息队列,由多个消费者来消费队列中的消息。我们可以想象这样的场景:皮鞋生产打包打包车间,不断有成品鞋进入传送带(消息队列)等待操作工人(消费者)将皮鞋打包。

    因为等待打包的鞋子特别多,我们需要安排多个打包工人在传送带两边,及时从传送带取出成品鞋,然后装箱打包。我们要求是要确保工人最后打包好的皮鞋数量一双不少,不能因为打包工人操作慢或者个人原因暂时离开生产线,导致最终打包数不一致。

    图片

    消息发送

    生产者将消息发送给队列,至于谁来消费(处理)这些消息,生产者不管。

    消息队列(MQ),用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    消息到达队列中后,如果没有一个消费者来处理消息的话,我们希望队列中的消息不要丢弃,也就是消息持久化。在生产者和消费者中都要将queue_declare第3个参数设置为true,表示让消息队列持久化。

    $channel->queue_declare($queue, false, true, false, false); 
    
    

    此外,我们可以确保即使RabbitMQ重启了,消息队列不会丢失,在生产者端设置:。

    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

    生产者文件sender.php

    <?php
    /**
     * @sender.php
     * @消息生产者-分发任务
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $queue = 'worker';
    
    //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $connection = new AMQPStreamConnection(
        '192.168.0.100', 
        56720, 
        'helloweba',  //user
        'helloweba',  //password
        'test'  //vhost
    );
    $channel = $connection->channel();
    
    $channel->queue_declare($queue, false, true, false, false); //第3个参数设置为true,表示让消息队列持久化
    
    for ($i = 0; $i < 100; $i++) { 
        $arr = [
            'id' => 'message_' . $i,
            'order_id' => str_replace('.', '' , microtime(true)) . mt_rand(10, 99) . $i,
            'content' => 'helloweba-' . time()
        ];
        $data = json_encode($arr);
        $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); ////设置rabbitmq重启后也不会丢失队列,或者设置为'delivery_mode' => 2
        $channel->basic_publish($msg, '', $queue);
    
        echo 'Send message: ' . $data . PHP_EOL;
    }
    
    $channel->close();
    $connection->close();

    消息接收

    消费者是指完成消息的接收和处理的客户端程序,消费者就如同生产线上的操作工人,他们按照操作规程从传送带上取出产品后有序的完成后续工作任务。

    实际项目中,如果消费者处理消息能力不够时,就要开启多个消费者来消费队列中的消息。默认情况下,RabbitMQ将会把队列中的消息平均分配给每个消费者。如果消费者要对分配到的消息任务处理时间很长(耗时任务),那么处理消息任务的时候就有可能会遇到意外。

    比如某个消费者断电了,或者出故障了,那它正在处理的消息会怎么办?这里就是RabbitMQ的消息确认机制,为了保证数据不丢失,RabbitMQ会将未处理完的消息分配给下一个消费者处理。

    此外RabbitMQ还可以设置公平分配消息任务,不会给某个消费者同时分配多个消息处理任务,因为消费者无法同时处理多个消息任务。

    换句话说,RabbitMQ在处理和确认消息之前,不会向消费者发送新的消息,而是将消息分发给下一个不忙的消费者。

    $channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息

    消费者文件receiver.php

    <?php
    /**
     * @receiver.php
     * @消息消费者-接收端
     */
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $queue = 'worker';
    
    //$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $connection = new AMQPStreamConnection('192.168.0.100', 56720, 'helloweba', 'helloweba', 'test');
    $channel = $connection->channel();
    
    $channel->queue_declare($queue, false, true, false, false);
    
    echo ' [*] Waiting for messages. To exit press CTRL+C' . PHP_EOL;
    
    $callback = function($msg){
        echo " Received message:", $msg->body, PHP_EOL;
        sleep(1);  //模拟耗时执行
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    
    $channel->basic_qos(null, 1, null); //处理和确认完消息后再消费新的消息
    $channel->basic_consume($queue, '', false, false, false, false, $callback); //第4个参数值为false表示启用消息确认
    
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();

    模拟测试

    现在我们运行多个消费者终端,可以打开多个ssh客户端,client1和client2运行:

    php receiver.php
    
    
    开启生产者
    php sender.php

    由于消费者是阻塞运行的,他们会一直等待队列中的消息,当有消息就会去取出来处理。我们可以模拟将其中某个客户端中断,即断开某个消费者。

    然后再看消息是不是被其他消费者接收处理了。同样我们可以模拟将客户端全部重启,看看队列中的消息是否没有丢失。

    当client1中断连接RabbitMQ后,再次运行连接RabbitMQ,在client2中看到的消息处理情况,注意看图中的消息id。

  • 相关阅读:
    Mac + Python3 安装scrapy
    Pyqt4+Eric6+python2.7.13(windows)
    js基础⑥
    python模块之os,sys
    Python模块之random
    Python模块之PIL
    js基础⑤
    js基础④
    js基础③
    centOS目录结构详细版
  • 原文地址:https://www.cnblogs.com/xingxia/p/rabbitmq_php.html
Copyright © 2011-2022 走看看