zoukankan      html  css  js  c++  java
  • php 使用AMQP扩展调用RabbitMq

     先试用Docker安装 RabbitMq

    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

    cons_order.php

    //配置信息
    $conn_args = array(
        'host' => '192.168.7.50',
        'port' => '5672',
        'login' => 'guest',
        'password' => 'guest',
        'vhost'=>'/'
    );
    $e_name = 'business'; //交换机名
    $q_name = 'business_order'; //队列名
    $k_route = 'sendOrder'; //路由key
    
    //创建连接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    $channel = new AMQPChannel($conn); //只需要创建这一个即可,至于为什么多一个channel就不知道了
    
    //创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  ,精准投递,根据交换机publish的路由信息队列绑定时才会进行
    $ex->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
    echo "Exchange Status:".$ex->declareExchange()."
    ";   //查看如果交换机不存在则进行创建
    
    //创建队列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); //队列持久化
    echo "Message Total:".$q->declareQueue()."
    ";   //同样,如果不存在则创建
    
    //绑定交换机与队列,并指定路由键
    echo 'Queue Bind: '.$q->bind($e_name, $k_route)."
    "; //绑定路由,只处理投递指定$k_route的消息
    
    //阻塞模式接收消息
    echo "Message:
    ";
    while(True){
        $q->consume('processMessage');
        //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
    }
    $conn->disconnect();
    
    /**
     * 消费回调函数
     * 处理消息
     */
    function processMessage($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg."
    "; //处理消息
        $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    }

    cons_mem.php

    //配置信息
    $conn_args = array(
        'host' => '192.168.7.50',
        'port' => '5672',
        'login' => 'guest',
        'password' => 'guest',
        'vhost'=>'/'
    );
    $e_name = 'business'; //交换机名
    $q_name = 'business_mem'; //队列名
    $k_route = 'sendMem'; //路由key
    
    //创建连接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    $channel = new AMQPChannel($conn); //只需要创建这一个即可,至于为什么多一个channel就不知道了
    
    //创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型  ,精准投递,根据交换机publish的路由信息队列绑定时才会进行
    $ex->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
    echo "Exchange Status:".$ex->declareExchange()."
    ";   //查看如果交换机不存在则进行创建
    
    //创建队列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); //队列持久化
    echo "Message Total:".$q->declareQueue()."
    ";   //同样,如果不存在则创建
    
    //绑定交换机与队列,并指定路由键
    echo 'Queue Bind: '.$q->bind($e_name, $k_route)."
    "; //绑定路由,只处理投递指定$k_route的消息
    
    //阻塞模式接收消息
    echo "Message:
    ";
    while(True){
        $q->consume('processMessage');
        //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
    }
    $conn->disconnect();
    
    /**
     * 消费回调函数
     * 处理消息
     */
    function processMessage($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg."
    "; //处理消息
        $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
    }

    pro.php

    $conn_args = array(
        'host' => '192.168.7.50',
        'port' => '5672',
        'login' => 'guest',  //默认的用户名和密码
        'password' => 'guest',
        'vhost'=>'/'
    );
    $e_name = 'business'; //交换机名
    //$q_name = 'q_linvo'; //无需队列名
    $k_route = 'sendMem'; //路由key 与
    
    //创建连接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    $channel = new AMQPChannel($conn);
    
    //消息内容
    $message = "TEST MESSAGE! 测试消息!";
    
    //创建交换机对象
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_DIRECT); //投递机制,精准投递,这个我们在消费端设置即可
    $ex->setFlags(AMQP_DURABLE);//持久化设置
    $ex->declareExchange();
    
    //发送消息
    //$channel->startTransaction(); //开始事务
    //for($i=0; $i<5; ++$i){
        //如果需要进行消息持久化机制的话,则进行如下设置
        //$ex->publish($message,$k_route,AMQP_NOPARAM,array());//如果delivery是1的话,则表明是非持久化,持久化的时候注意使用AUTOASK。则处理完的消息可以自动删除
        //不需要的话则进行这样:$ex->publish($message, $k_route,AMQP_NOPARAM,array('delivery_mode'=>2, 'priority'=> 9));
    //    echo "Send Message:".$ex->publish($message, $k_route)."
    ";
    //}
    //$channel->commitTransaction(); //提交事务
    $i = 0;
    while (true){
    
        $i++;
        if($i > 1000){
            break;
        }
        if(rand(10,20)%2 == 0){
            $k_route = "sendOrder";
        }else{
            $k_route = "sendMem";
        }
        echo "Send Message:".$ex->publish($message.$k_route, $k_route)."
    ";
        echo "Send Message Content:".$message.$k_route."
    ";
    }
    
    $conn->disconnect();
  • 相关阅读:
    [JSOI2007][BZOJ1031] 字符加密Cipher|后缀数组
    leetcode Flatten Binary Tree to Linked List
    leetcode Pascal's Triangle
    leetcode Triangle
    leetcode Valid Palindrome
    leetcode Word Ladder
    leetcode Longest Consecutive Sequence
    leetcode Sum Root to Leaf Numbers
    leetcode Clone Graph
    leetcode Evaluate Reverse Polish Notation
  • 原文地址:https://www.cnblogs.com/itsuibi/p/13491496.html
Copyright © 2011-2022 走看看