zoukankan      html  css  js  c++  java
  • 宝塔中极速安装的PHP如何使用AMQP连接RabbitMQ

    前言:

    有些人为了让项目快速上线,服务器往往安装宝塔面板,然后再极速安装LNMP。尽管环境搭建的时间省了,但是宝塔上PHP中扩展包没有提供AMQP。这时候只是为了使用消息队列而对PHP大动干戈, 不如使用一个PHP AMQP的库,即用即装,不对环境造成影响。

    简介:

    php-amqplib 客户端库,通过composer安装,不需要在PHP中安装扩展,以下为两种不同的安装方式。

    1. 项目中新建composer.json,添加如下代码,然后composer install

    {
        "require": {
            "php-amqplib/php-amqplib": " 2.6.*"
        }
    }
    

    2. 命令进入到项目,然后 composer require php-amqplib/php-amqplib 2.6.*

     

    RabbitMQ设置:

    1. 进入web管控台,添加新用户,角色管理员,任何IP上都可以登录,授权指定虚拟机。

    2. 添加交换机

    3. 添加队列并与交互机绑定。

     

    编码:

    1. 封装rabbitMQ类。

    <?php
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    /**
     * Class RabbitMQ.
     */
    class RabbitMQ
    {
        const READ_LINE_NUMBER = 0;
        const READ_LENGTH      = 1;
        const READ_DATA        = 2;
    
        public $config;
    
        public static $prefix   = 'autoinc_key:';
        protected $exchangeName = 'flow';
        protected $queueName    = 'flow_queue';
    
        /**
         * @var PhpAmqpLibConnectionAMQPStreamConnection
         */
        protected $connection;
        /**
         * @var PhpAmqpLibChannelAMQPChannel
         */
        protected $channel;
        protected $queue;
    	
        //配置项
        private $host;
        private $port;
        private $user;
        private $pass;
        private $vhost;
    
        public function __construct($config = [])
        {
            //$this->config = $config;
    
            //设置rabbitmq配置值
            $this->host  = '192.168.1.101';
            $this->port  = 5672;
            $this->user  = 'beiqiaosu';
            $this->pass  = 'beiqiaosu';
            $this->vhost = 'report';
    
            $this->connect();
        }
    
        public function __call($method, $args = [])
        {
            $reConnect = false;
            while (1) {
                try {
                    $this->initChannel();
                    $result = call_user_func_array([$this->channel, $method], $args);
                } catch (Exception $e) {
                    //已重连过,仍然报错
                    if ($reConnect) {
                        throw $e;
                    }
    
                    Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1));
                    if ($this->connection) {
                        $this->close();
                    }
    
                    $this->connect();
    
                    $reConnect = true;
                    continue;
                }
    
                return $result;
            }
            //不可能到这里
            return false;
        }
    
        /**
         * 连接rabbitmq消息队列.
         *
         * @return bool
         */
        public function connect()
        {
            try {
                if ($this->connection) {
                    unset($this->connection);
                }
                $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
            } catch (Exception $e) {
    			echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage();
                return false;
            }
        }
    
        /**
         * 关闭连接.
         */
        public function close()
        {
            $this->channel->close();
            $this->connection->close();
        }
    
        /**
         * 设置交换机名称.
         *
         * @param string $exchangeName
         */
        public function setExchangeName($exchangeName = '')
        {
            $exchangeName && $this->exchangeName = $exchangeName;
        }
    
        /**
         * 设置队列名称.
         *
         * @param string $queueName
         */
        public function setQueueName($queueName = '')
        {
            $queueName && $this->queueName = $queueName;
        }
    
        /**
         * 设置频道.
         */
        public function initChannel()
        {
            if (!$this->channel) {
                //通道
                $this->channel = $this->connection->channel();
                $this->channel->queue_declare($this->queueName, false, true, false, false);
                $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
                $this->channel->queue_bind($this->queueName, $this->exchangeName);
            }
        }
    
        /**
         * 获取队列数据.
         *
         * @return mixed
         */
        public function pop()
        {
            while (1) {
                try {
                    $this->connect();
                    $this->initChannel();
                    $message = $this->channel->basic_get($this->queueName);
    				
                    if ($message) {
                        $this->channel->basic_ack($message->delivery_info['delivery_tag']);
                        $result = $message->body;
                    } else {
                        throw new Exception('Empty Queue Data');
                    }
                } catch (Exception $e) {
                    //Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")");
                    sleep(1);
                    continue;
                }
    
                return $result;
            }
    		
            //不可能到这里
            return false;
        }
    
        /**
         * 插入队列数据.
         *
         * @param $data
         *
         * @return bool
         */
        public function push($data)
        {	
            while (1) {		
                try {
                    $this->connect();
                    $this->initChannel();
                    $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
                    $this->channel->basic_publish($message, $this->exchangeName);
                } catch (Exception $e) {
    				echo "$e->getMessage()";
                    continue;
    			}
    
                return true;
            }
    		
            //不可能到这里
            return false;
        }
    }
    

    2. 操作mq,出队,入队。

    <?php
    
    require_once "vendor/autoload.php";
    require_once "component/RabbitMQ.php";
    
    $mq = new RabbitMQ();
    
    // 消息消费测试
    /*try {
    	$res = $mq->pop();
    	
    }catch(Exception $e) {
    	
    	var_dump($e->getMessage());die;
    }*/
    
    
    // 消息生产测试
    try {
    	$res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155']));
    	
    }catch(Exception $e) {
    	
    	var_dump($e->getMessage());die;
    }
    
    
    var_dump($res);die;
    

    测试:

    1. 先通过生产消息(入队)方法运行一下,然后进入队列中get message查看消息总数。

    2. 测试调用消费,再查看总数。

    关注公众号,回复 “宝塔MQ” 获取demo源码。

  • 相关阅读:
    WebApi Owin SelfHost OAuth2
    HTML5 localStorage、sessionStorage 作用域
    Owin WebApi版本控制
    C# Guid 16位 唯一
    C# TimeSpan获取 年月
    ASP.NET Web Api OwinSelfHost Restful 使用
    UrlRouteModule
    asp.net 代码 注意点
    JS小问题总结
    JS中javascript:void(0)真正含义
  • 原文地址:https://www.cnblogs.com/zerofc/p/13411732.html
Copyright © 2011-2022 走看看