zoukankan      html  css  js  c++  java
  • PHP使用RabbitMQ消息队列

    1、安装amqp拓展 安装流程

    2、下载工具包 php-amqplib 

    composer require php-amqplib/php-amqplib
     
    3、代码操作如下
    【消费消息】
     1 <?php 
     2 //配置信息 
     3 $conn_args = array( 
     4     'host' => '127.0.0.1',
     5     'port' => '5672',  
     6     'login' => 'zcw',  
     7     'password' => '123456', 
     8     'vhost'=>'/' 
     9 );   
    10 $e_name = 'exchange1'; //交换机名 
    11 $q_name = 'queue1'; //队列名 
    12 $k_route = 'route1'; //路由key 
    13  
    14 //创建连接和channel 
    15 $conn = new AMQPConnection($conn_args);   
    16 if (!$conn->connect()) {   
    17     die("Cannot connect to the broker!
    ");   
    18 }   
    19 $channel = new AMQPChannel($conn);   
    20  
    21 //创建交换机    
    22 $ex = new AMQPExchange($channel);   
    23 $ex->setName($e_name); 
    24 $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型(常用的有fanout、direct、topic、headers)  
    25 $ex->setFlags(AMQP_DURABLE); //持久化 
    26 
    27    
    28 //创建队列    
    29 $q = new AMQPQueue($channel); 
    30 $q->setName($q_name);   
    31 $q->setFlags(AMQP_DURABLE); //持久化  
    32 
    33 $total = $q->declareQueue();//获取所有的消息数量
    34  
    35 //绑定交换机与队列,并指定路由键 
    36 $q->bind($e_name, $k_route); 
    37  
    38 //1、阻塞模式接收消息 
    39 while(True){ 
    40     $q->consume('processMessage');   
    41     //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  
    42 } 
    43 
    44 //2 非阻塞模式接收消息 可定时调用
    45 //if($total){
    46 //    for($i=0;$i<$total;$i++){
    47 //        $envelope = $q->get();
    48 //        if($envelope){
    49 //             $msg = $envelope->getBody(); 
    50 //             echo $msg."
    "; //处理消息 
    51 //             $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    52 //        }
    53 //    }
    54 //}
    55 
    56 $conn->disconnect();   
    57  
    58 /**
    59  * 消费回调函数
    60  * 处理消息
    61  */ 
    62 function processMessage($envelope, $queue) { 
    63     $msg = $envelope->getBody(); 
    64     echo $msg."
    "; //处理消息 
    65     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    66 }
    67 ?>

    【生产消息】

     1 <?php
     2 //配置信息 
     3 $conn_args = array( 
     4     'host' => '127.0.0.1',  
     5     'port' => '5672',  
     6     'login' => 'zcw',
     7     'password' => '123456',
     8     'vhost'=>'/' 
     9 );   
    10 $e_name = 'exchange1'; //交换机名 
    11 //$q_name = 'queue1'; //无需队列名 
    12 $k_route = 'route1'; //路由key
    13  
    14 //创建连接和channel 
    15 $conn = new AMQPConnection($conn_args);   
    16 if (!$conn->connect()) {   
    17     die("Cannot connect to the broker!
    ");   
    18 }   
    19 $channel = new AMQPChannel($conn);   
    20 //创建交换机对象    
    21 $ex = new AMQPExchange($channel);   
    22 $ex->setName($e_name);   
    23 //发送消息 
    24 //$channel->startTransaction(); //开始事务  
    25 for($i=0; $i<5; ++$i){
    26    $ex->publish($message, $k_route)."
    ";
    27 }
    28  
    29 //$channel->commitTransaction(); //提交事务 
    30  
    31 $conn->disconnect();
    32 
    33 ?>

  • 相关阅读:
    Python 工程管理及 virtualenv 的迁移
    Python基础系列讲解——random模块随机数的生成
    Python进阶量化交易场外篇5——标记A股市场涨跌周期
    Python学习案例之视频人脸检测识别
    基于python的Splash基本使用和负载均衡配置
    你所听到的技术原理、技术本质到底是什么?
    BAT大厂面试流程剖析
    基于Python的ModbusTCP客户端实现
    互联网寒冬,Python 程序员如何准备面试
    ES-查询后10000条数据的设置
  • 原文地址:https://www.cnblogs.com/guliang/p/11743229.html
Copyright © 2011-2022 走看看