zoukankan      html  css  js  c++  java
  • phpamqp消息队列教程-代码实现实例

    逻辑:
    创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息
     
    生产者:
    $conn_args = array(  
        'host' => '127.0.0.1',   
        'port' => '5672',   
        'login' => 'gedai',   
        'password' => 'gedai',  
        'vhost'=>'/credithc'  
    );    
    $e_name = 'CREDITHC_CS'; //交换机名  
    $q_name = 'to_tel_sales'; //无需队列名  
    $k_route = 'to_tel_sales'; //路由key  
      
    //创建连接和channel  
    $conn = new AMQPConnection($conn_args);    
    if (!$conn->connect()) {    
        die("Cannot connect to the broker! ");    
    }    
    $channel = new AMQPChannel($conn);    
      
    //消息内容  
    $message = date("Y-m-d H:i:s") . "TEST send RabbitMQ!";    
      
    //创建交换机对象     
    $ex = new AMQPExchange($channel);    
    $ex->setName($e_name);    
      
    //发送消息  
    //$channel->startTransaction(); //开始事务   
    for($i=0; $i<5; ++$i){  
        echo "Send Message:".$ex->publish($message, $k_route)." ";   
    }  
    //$channel->commitTransaction(); //提交事务  
      
    消费者:
    //配置信息  
    $conn_args = array(  
    'host' => '10.100.13.142',   
    'port' => '5672',   
    'login' => 'gedai',   
    'password' => 'gedai',  
    'vhost'=>'/credithc'  
    );    
    $e_name = 'CREDITHC_CS'; //交换机名  
    $q_name = 'to_tel_sales'; //队列名  
    $k_route = 'to_tel_sales'; //路由key  
      
    //创建连接和channel  
    $conn = new AMQPConnection($conn_args);    
    if (!$conn->connect()) {    
    die("Cannot connect to the broker! ");    
    }    
    $channel = new AMQPChannel($conn);    
      
    //创建交换机     
    $ex = new AMQPExchange($channel);    
    $ex->setName($e_name);  
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型   
    $ex->setFlags(AMQP_DURABLE); //持久化  
    // echo "Exchange Status:".$ex->declare()." ";    
    //创建队列     
    $q = new AMQPQueue($channel);  
    $q->setName($q_name);    
    $q->setFlags(AMQP_DURABLE); //持久化   
    echo "Message Total:".$q->declareQueue()." ";    
      
    //绑定交换机与队列,并指定路由键  
    echo 'Queue Bind: '.$q->bind($e_name, $k_route)." ";  
      
    //阻塞模式接收消息  
    echo "Message: ";    
    while(True){  
    $q->consume('processMessage');    
    // $q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答   
     
    //get获取
    $conn->disconnect();    
      
     
    function processMessage($envelope, $queue) {  
    $msg = $envelope->getBody();  
    echo date("Y-m-d H:i:s") . $msg." "; //处理消息  
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答  
     
    queue对象有两个方法可用于取消息:consume和get。
    前者是阻塞的,无消息时会被挂起,适合循环中使用;
    后者则是非阻塞的,取消息时有则取,无则返回false。
  • 相关阅读:
    华章IT图书书讯(2011年第8期)
    iPhone游戏开发实践指南
    【北京讲座】Android系统Framework层源码分析(2011.09.24)
    深入理解Android
    你学或不学,Java就在那里,不离不弃
    近百本精品图书全部免费赠送——仅面向学生
    极限编程(Extreme Programming, XP)
    对任何希望深入理解C#的程序员来说,这本书都是不容错过的经典书籍
    C# 文件操作(转)
    一些数据格式化Eval( " ")和DataBinder.Eval(Container.DataItem, " ")的区别及用法
  • 原文地址:https://www.cnblogs.com/wjs2019/p/11720524.html
Copyright © 2011-2022 走看看