zoukankan      html  css  js  c++  java
  • PHP中使用RabiitMQ---各项参数的使用方法

    RabbitMQ在PHP使用,我在这里对RabbitMQ的各项方法和参数进行了一些梳理,有不足的地方还望各位大神指点.

    想要使用rabbitMQ消息队列,首先需要安装 php_amqp.dll 扩展 和 rabbitMQ 服务, 至于怎么安装大家可以百度一下,这类的资料还是有很多的.
    -----------------------------------------------------------------------------------------------------------------------------------------------------------------
    RabbitMQ的工作原理大概如下:
    生产者:生产消息--->创建交换器:对消息进行routekey鉴别,然后分发到相对应的队列---->队列:通过routekey,装载消息,然后运输给消费者-->消费者:接收队列传送的消息,进行处理,然后给予回应.

    下面是RabbitMQ生产者实现的简单代码:
    <?php
    $exchangeName = 'ceshi1';
    $queueName = 'ceshi_queue1';
    $str = 'ceshi'.rand(1,9999);
    $routeKey = 'ceshi_queue1';
    $message = $str;
    
    $connection = new AMQPConnection(array('host' => '', 'port' => '', 'vhost' => '', 'login' => '', 'password' => ''));
    $connection->connect() or die("Cannot connect to the broker!
    ");
    
    # 连接服务
    $channel = new AMQPChannel($connection);
    #  创建交换器
    $exchange = new AMQPExchange($channel);
    # 设置交换器的名称
    $exchange->setName($exchangeName);
    # 设置多个参数值[setArguments ( array $arguments )];
    # 获取该队列某个参数的信息 , getArgument ( string $key )
    # [如果无法将此交换机的消息路由,则将它们发送到这里指定的备用交换机。(设置“alternate-exchange”参数)。]
    # 设置交换器类型
    # AMQP_EX_TYPE_DIRECT[点对点],AMQP_EX_TYPE_FANOUT[广播],AMQP_EX_TYPE_TOPIC[模糊匹配],AMQP_EX_TYPE_HEADER[头匹配]
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    # 交换器持久化
    //$exchange->setFlags(AMQP_DURABLE);
    # 执行创建交换器
    $exchange->declareExchange();
    # 创建队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    # 设置单个指定的参数键值,setArgument ( string $key , mixed $value )
    # 获取该队列某个参数的信息 , getArgument ( string $key )
    //$queue->setArgument();
    # [发布到队列中的消息可以在被丢弃之前生存多长时间(毫秒(设置x-message-ttl)参数。]
    # [在自动删除队列(毫秒)之前,该队列不能被使用多长时间。(设置“x-expires”参数)。]
    # [队列中有多少(准备好的)消息[个数],可以在它开始,从其头上删除之前包含。(设置x-max-length长度)参数]
    # [准备好的消息的总大小,队列可以在它开始从头上删除之前包含。(设置x-max-length-bytes 纵向字节)参数]
    # [如果拒绝或过期消息将被重新发布的Exchange的可选名称。(设置“x-dead-letter-exchange”参数,[交换器名,(string)])。]
    # [可选的替换路由密钥,当消息已被写入字母时使用。如果未设置此消息,将使用消息的原始路由密钥。(设置“x-dead-letter-routing-key”参数)。]
    # [支持队列的最大优先级数量;如果未设置,队列将不支持消息优先级。(设置x-max-priority)参数。]
    # [将队列设置为懒惰模式,尽可能保留磁盘上的尽可能多的消息以减少RAM的使用;如果没有设置,队列将保持内存中的缓存以尽可能快地传递消息。(设置“x-queue-mode”参数设置:lazy)。]
    # [将队列设置为主定位模式,确定在节点群集上声明队列主控器所处的规则。(设置x-queue-master-locator 参数。]
    # 设置多个参数值[setArguments ( array $arguments )];
    # 获取该队列所有参数的信息 , getArguments ( void )
    //$queue->setArguments();
    # 持久化队列
    # AMQP_DURABLE[队列持久], AMQP_PASSIVE[返回消息计数], AMQP_EXCLUSIVE[只被一个连接(connection)使用,而且当连接关闭后队列即被删除], AMQP_AUTODELETE[当最后一个消费者退订后即被删除].
    $queue->setFlags(AMQP_DURABLE);
    # 执行创建队列
    $queue->declareQueue();
    
    # 开启事务,确保数据真正不丢失
    $channel->startTransaction();
    # 将消息和标识绑定到交换器中
    # publish(message[消息],route_key,[ flags[ AMQP_MANDATORY,AMQP_IMMEDIATE],attributes[array] ]);
    # attributes 存有参数 => [content_type(用于描述MIME类型的编码。例如,对于经常使用的JSON编码,将此属性设置为 “application/json” 是一个很好的做法。),
    #content_encoding,message_id,user_id,app_id,delivery_mode[2 消息持久,1非持久],priority,timestamp,expiration,type,reply_to(主要用于命名一个回调队列 ) ]
    $exchange->publish($message,$routeKey);
    $channel->commitTransaction();
    var_dump("[x] Sent $message");
    
    # 关闭连接
    $connection->disconnect();
    
    ?>
    以上是生产者的简单代码,下面我们可以看看消费者的简单代码:
    <?php
    $exchangeName = 'ceshi1';
    $queueName = 'ceshi_queue1';
    $routeKey = 'ceshi_queue1';
    
    $connection = new AMQPConnection(array('host' => '', 'port' => '', 'vhost' => '', 'login' => '', 'password' => ''));
    $connection->connect() or die("Cannot connect to the broker!
    ");
    
    # 连接服务
    $channel = new AMQPChannel($connection);
    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    # 设置交换器类型
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    # 交换器持久化
    //$exchange->setFlags(AMQP_DURABLE);
    $exchange->declareExchange();
    
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();
    
    # 队列根据routekey 装载消息
    $queue->bind($exchangeName,$routeKey);
    var_dump('[*] Waiting for messages. To exit press CTRL+C');
    #
    $queue->consume('callback');
    # 设置每次只能处理一条,避免消息堆积,从而导致队列挂掉
    $channel->qos(0,1);
    #关闭连接
    $connection->disconnect();
    
    function callback($envelope, $queue) {
        $msg = $envelope->getBody();
    //    var_dump(" [x] Received:" . $msg);
        echo $msg."
    ";
    //    # nack 让没有通过的消息可以    再次进入队列, ack 没有通过的消息不能再次回到队列中
    //    # 通知队列消息通过,可以执行下一个消息
        $queue->nack($envelope->getDeliveryTag());
    }
    ?>
    
    
    
     
    这些代码都是在网上收集的,我个人在附加了一些方法和参数的使用方法,希望能帮到各位,谢谢!!!!



  • 相关阅读:
    Python高级网络编程系列之第二篇
    Python高级网络编程系列之第一篇
    Python高级网络编程系列之基础篇
    利用Python实现12306爬虫--查票
    Linux Shell脚本欣赏
    Linux Shell脚本 之 条件判断
    VMware Workstation虚拟网络VMnet0、VMnet1、VMnet8的图解
    Linux的虚拟机采用NAT方式时如何能在虚拟机中访问互联网
    Linux的虚拟机拷贝到另外的操作系统时,NAT方式的静态IP无效,一直是获取的DHCP动态地址
    Hadoop
  • 原文地址:https://www.cnblogs.com/fqyb/p/9091945.html
Copyright © 2011-2022 走看看