zoukankan      html  css  js  c++  java
  • RabbitMq学习2-php命令行模式测试rabbitmq

    一、RabbitMQ结构

    1、几个概念说明:
          Broker:简单来说就是消息队列服务器实体。
      Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
      Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
      Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
      Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
      vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
      producer:消息生产者,就是投递消息的程序。
      consumer:消息消费者,就是接受消息的程序。
      channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
    2、消息队列的使用过程大概如下:
          (1)客户端连接到消息队列服务器,打开一个channel。
      (2)客户端声明一个exchange,并设置相关属性。
      (3)客户端声明一个queue,并设置相关属性。
      (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
      (5)客户端投递消息到exchange。
    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
    exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
    RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
      (1)exchange持久化,在声明时指定durable => 1
      (2)queue持久化,在声明时指定durable => 1
      (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
    如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
    二、安装php的amqp扩展

    先用phpinfo()查看php版本信息,

    最后根据上面的信息去下载相应的amqp版本:http://pecl.php.net/package/amqp

    据上面信息我们的是64位线程安全版本

    加压后:

    将php_amqp.dll复制到php/ext,同时在php.ini中添加如下代码:

    [amqp]  

    extension=php_amqp.dll 

    然后将rabbitmq.4.dll复制到php根目录C:/xampp/php/,同时修改apache配置文件httpd.conf,添加如下代码:

    # rabbitmq

    LoadFile  "C:/xampp/php/rabbitmq.4.dll"  

    最后重启看看是否已经加载了amqp模块:

    三、如果要在cli模式下测试运行rabbitmq,要配置cli模式的php.ini文件

    1、查看运行模式下的php.ini的位置

    (1)查看命令行模式下加载的php配置文件

    php -i|findstr .ini (window)
    php -i|grep .ini (linux)

    通常为php安装目录下的php.ini文件
    如:D:softwamp64inphpphp7.2.14php.ini

    (2)查看web模式下php加载的配置文件

    phpinfo();
    通常为apache目录下的php.ini文件
    如:D:softwamp64inapacheapache2.4.37inphp.ini


    2、配置D:softwamp64inphpphp7.2.14php.ini,在文件末尾加入

    [amqp]
    extension=php_amqp.dll

    保存即可

    四、php测试rabbitmq

      1、发送者(publisher):rabbit_publisher.php

      创建连接-->创建channel-->创建交换机对象-->创建队列-->发送消息

    <?php
    
    $exchangeName = 'demo';
    $queueName = 'hello';
    $routeKey = 'hello';
    $message = 'Hello World!';
    
    $connection = new AMQPConnection(
        array('host' => '127.0.0.1',
              'port' => '5672',
              'vhost' => '/', 
              'login' => 'guest', 
              'password' => 'guest')
    );
    
    $connection->connect() or die("Cannot connect to the broker!
    ");
    
    try {  
            $channel = new AMQPChannel($connection);
            $exchange = new AMQPExchange($channel);
            $exchange->setName($exchangeName);
            $queue = new AMQPQueue($channel);
            $queue->setName($queueName);
            $exchange->publish($message, $routeKey);
            var_dump("[x] Sent 'Hello World!'");
    } catch (AMQPConnectionException $e) {
            var_dump($e);
            exit();
    }
    $connection->disconnect();
    ?>
    View Code

      2、消费者(consumer):rabbit_consumer.php

          创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

    <?php 
    
    $exchangeName = 'demo';
    $queueName = 'hello';
    $routeKey = 'hello';
    
    $connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
    $connection->connect() or die("Cannot connect to the broker!
    ");
    $channel = new AMQPChannel($connection);
    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    $exchange->declareExchange(); // AMQP 1.2.0 由 declare() 改为 declareExchange(); 
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->declareQueue();//AMQP 1.2.0 由 declare() 改为 declareQueue(); 
    $queue->bind($exchangeName, $routeKey);
    
    var_dump('[*] Waiting for messages. To exit press CTRL+C');  
    while (TRUE) {  
            $queue->consume('callback');
    }
    
    
    function callback($envelope, $queue) {  
            $msg = $envelope->getBody();
            var_dump(" [x] Received:" . $msg);
            $queue->nack($envelope->getDeliveryTag());
    }
    
    $connection->disconnect();
     ?>
    View Code

      3、以管理者的身份运行RabbitMQ Command,启动mq:net start rabbitmq。切换到项目文件目录,执行消费者文件

      

      4、重新开启一个命令行窗口,同样切换到项目文件目录,执行生产者文件

      

      会看到每执行一次生产者文件,消费者那里都会受到生产者发送的消息:Hello World!

  • 相关阅读:
    POJ 1795 DNA Laboratory
    CodeForces 303B Rectangle Puzzle II
    HDU 2197 本源串
    HDU 5965 扫雷
    POJ 3099 Go Go Gorelians
    CodeForces 762D Maximum path
    CodeForces 731C Socks
    HDU 1231 最大连续子序列
    HDU 5650 so easy
    大话接口隐私与安全 转载
  • 原文地址:https://www.cnblogs.com/ivy-zheng/p/10961586.html
Copyright © 2011-2022 走看看