zoukankan      html  css  js  c++  java
  • PHP中RabbitMQ之amqp扩展实现(四)

    目前我在PHP里接触实现RabbitMQ的方式有两种,一种是通过amqp扩展,一种是使用php-amqplib,本章讲诉RabbitMQ的安装及amqp扩展及amqp扩展如何实现RabbitMQ

    环境:CoentOS,PHP 7

    1、RabbitMQ的安装

    需要下载的两个包

    erlang-21.0.7-1.el7.centos.x86_64.rpm

    rabbitmq-server-3.7.7-1.el7.noarch.rpm

    这两个包我已经放在了百度云盘的分享上

    链接:https://pan.baidu.com/s/1rMv_yFpLnH-D1S5wrOZrbA#list/path=%2FRabbitMQ

    提取码:ipyu

    然后参照 weixin_41368339的博客linux rabbitmq3.7.7安装与使用一文中的步骤安装步,基本上没有什么问题

    2、amqp扩展的安装

    这个请参照一只猪儿虫linux 编译安装amqp一文中的步骤安装,也没有什么问题

    当安装成功之后,即可开始用amqp实现RabbitMQ了

    3、Demo示例

    在安装完成后我们就可以开始我们的RabbitMQ之旅了,本Demo示例只创建了一个直连交换机,共有四个文件Consum.php (消费者),Publish.php (生产者) ,RabbitMqParernt.php (自己封装的RabbitMQ的方法) ,以及test.php (测试数据)

    RabbitMqParernt.php如下所示

    1.  
      <?php
    2.  
      abstract class RabbitMqParernt
    3.  
      {
    4.  
      //rabbitMQ配置信息(默认配置)
    5.  
      public $config = array(
    6.  
      'host'=>'127.0.0.1', //host
    7.  
      'port'=>5672, //端口
    8.  
      'username'=>'guest', //账号
    9.  
      'password'=>'guest', //密码
    10.  
      'vhost'=>'/' //虚拟主机
    11.  
      );
    12.  
       
    13.  
      public $exchangeName = ''; //交换机
    14.  
      public $queueName = ''; //队列名
    15.  
      public $routeKey = ''; //路由键
    16.  
      public $exchangeType = ''; //交换机类型
    17.  
       
    18.  
      public $channel; //信道
    19.  
      public $connection; //连接
    20.  
      public $exchange; //交换机
    21.  
      public $queue; //队列
    22.  
       
    23.  
      //初始化RabbitMQ($config数组是用来修改rabbitMQ的配置信息的)
    24.  
      public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = '',$config = array())
    25.  
      {
    26.  
      $this->exchangeName = $exchangeName;
    27.  
      $this->queueName = $queueName;
    28.  
      $this->routeKey = $routeKey;
    29.  
      $this->exchangeType = $exchangeType;
    30.  
      if(!empty($config))
    31.  
      {
    32.  
      $this->setConfig($config);
    33.  
      }
    34.  
      $this->createConnet();
    35.  
      }
    36.  
       
    37.  
      //对RabbitMQ的配置重新进行配置
    38.  
      public function setConfig($config)
    39.  
      {
    40.  
      if (!is_array($config))
    41.  
      {
    42.  
      throw new Exception('config不是一个数组');
    43.  
      }
    44.  
      foreach($config as $key => $value)
    45.  
      {
    46.  
      $this->config[$key] = $value;
    47.  
      }
    48.  
      }
    49.  
       
    50.  
      //创建连接与信道
    51.  
      public function createConnet()
    52.  
      {
    53.  
      //创建连接
    54.  
      $this->connection = new AMQPConnection($this->config);
    55.  
      if(!$this->connection->connect())
    56.  
      {
    57.  
      throw new Exception('RabbitMQ创建连接失败');
    58.  
      }
    59.  
      //创建信道
    60.  
      $this->channel = new AMQPChannel($this->connection);
    61.  
      //创建交换机
    62.  
      $this->createExchange();
    63.  
      //生产时不需要队列,故队列名为空,只有消费时需要队列名
    64.  
      if(!empty($this->queueName))
    65.  
      {
    66.  
      $this->createQueue();
    67.  
      }
    68.  
      }
    69.  
       
    70.  
      //创建交换机
    71.  
      public function createExchange()
    72.  
      {
    73.  
      $this->exchange = new AMQPExchange($this->channel);
    74.  
      $this->exchange->setName($this->exchangeName);
    75.  
      $this->exchange->setType(AMQP_EX_TYPE_DIRECT);
    76.  
      $this->exchange->setFlags(AMQP_DURABLE);
    77.  
      }
    78.  
       
    79.  
      //创建队列,绑定交换机
    80.  
      public function createQueue()
    81.  
      {
    82.  
      $this->queue = new AMQPQueue($this->channel);
    83.  
      $this->queue->setName($this->queueName);
    84.  
      $this->queue->setFlags(AMQP_DURABLE);
    85.  
      $this->queue->bind($this->exchangeName, $this->routeKey);
    86.  
      }
    87.  
       
    88.  
      public function dealMq($flag)
    89.  
      {
    90.  
      if($flag)
    91.  
      {
    92.  
      $this->queue->consume(function($envelope){$this->getMsg($envelope, $this->queue);},AMQP_AUTOACK);//自动ACK应答
    93.  
      }
    94.  
      else
    95.  
      {
    96.  
      $this->queue->consume(function($envelope){$this->processMessage($envelope, $this->queue);});
    97.  
      }
    98.  
      }
    99.  
       
    100.  
      public function getMsg($envelope, $queue)
    101.  
      {
    102.  
      $msg = $envelope->getBody();
    103.  
      $this->doProcess($msg);
    104.  
      }
    105.  
       
    106.  
      public function processMessage($envelope, $queue)
    107.  
      {
    108.  
      $msg = $envelope->getBody();
    109.  
      $this->doProcess($msg);
    110.  
      $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    111.  
      }
    112.  
       
    113.  
      //处理消息的真正函数,在消费者里使用
    114.  
      abstract public function doProcess($msg);
    115.  
       
    116.  
      //发送消息
    117.  
      public function sendMessage($message)
    118.  
      {
    119.  
      $this->exchange->publish($message, $this->routeKey);
    120.  
      }
    121.  
       
    122.  
       
    123.  
      //关闭连接
    124.  
      public function closeConnect()
    125.  
      {
    126.  
      $this->channel->close();
    127.  
      $this->connection->disconnect();
    128.  
      }
    129.  
      }

     Consum.php 如下所示

    1.  
      <?php
    2.  
      include_once('RabbitMqParernt.php');
    3.  
      class Consum extends RabbitMqParernt
    4.  
      {
    5.  
      public function __construct()
    6.  
      {
    7.  
      parent::__construct('exchange', 'queue', 'routeKey');
    8.  
      }
    9.  
      public function doProcess($msg)
    10.  
      {
    11.  
      echo $msg;
    12.  
      }
    13.  
      }
    14.  
      $consum = new Consum();
    15.  
      //$consum->dealMq(false);
    16.  
      $consum->dealMq(true);

    Publish.php如下所示

    1.  
      <?php
    2.  
      include_once('RabbitMqParernt.php');
    3.  
      class Publish extends RabbitMqParernt
    4.  
      {
    5.  
      public function __construct()
    6.  
      {
    7.  
      parent::__construct('exchange', '', 'routeKey');
    8.  
      }
    9.  
      public function doProcess($msg)
    10.  
      {
    11.  
       
    12.  
      }
    13.  
       
    14.  
      }

    test.php如下所示

    1.  
      <?php
    2.  
      include_once('Publish.php');
    3.  
      $publish = new Publish();
    4.  
      $publish->sendMessage('Hello,World!');
    5.  
      $publish->closeConnect();

    4、添加交换机与队列

    打开http://ip(你的RabbitMQ安装的主机):15672/,会进入到RabbitMQ的可视化管理后台登录页面,登录你的账号密码(如果你是按照第一步提到的博客里的教程来装的,那你的账号密码就是guest),然后新加交换机和队列,

    以下是新加交换机的操作,注意vhost与以及交换机的名称要与代码里的消费者与生产者传入的参数值保持一致,如果你不想使用"/"这个默认的vhost,也可以新建一个vhost(什么?你问我如何新建,那么请百度一下),但是要记住在代码里创建消费者与生产者时把你新加的这个vhost传进去,覆盖RabbitMqParernt.php里的vhost

    以下是新加队列,这里的vhost要与上一步的vhost保持一致,保证交换机与队列在同一个vhost下,不然交换机会找不到队列的,队列名与消费者代码里传入进去的队列名保持一致

    5、运行代码

    然后我们在一个窗口先启动消费者

    在另外一个窗口运行test.php文件

    出现这个生产者发布的hello,world!就成功了

    注意:消费者与生产者传入的交换机名称,路由键必须相同

                交换机类型请务必选择直连,各种交换机的路由键形式不大相同,有兴趣的同学可以去试试其它类型的交换机实现哦

                当修改了vhost或者交换机名称,队列名称等时,需要修改对应代码

                至于注释里的ack应答,我会在之后的博客里详细介绍,包括RabbitMQ的持久化,这里使用默认的ack应答即可

                对于amqp的拓展的使用,大家也可以去研究一下

                关于管理后台及RabbitMQ的命令,我这里就不多介绍了,有兴趣的同学去网上搜索一下就能搜到好多

    下一篇:RabbitMQ的持久化(六)

  • 相关阅读:
    基于pytest实现appium多进程兼容性测试
    git中的常用命令
    刷题(四)
    appium server命令行启动
    pytest添加运行失败截图和使用自定义的css
    fixture的参数化
    Suggestion: add 'tools:replace="android:value"' to <meta-data> element at AndroidManifest.xml:25:5-27:41 to override.
    svn代码管理器
    把Model改成Lib
    com.baidu.mapapi.CoordType
  • 原文地址:https://www.cnblogs.com/brady-wang/p/11293892.html
Copyright © 2011-2022 走看看