zoukankan      html  css  js  c++  java
  • php rabbitmq使用示例

    发送消息,即客户端口

    基本流程:

    1.创建连接
    2.在连接内创建一个通道
    3.创建交换机
    4.创建队列
    5.发送信息
    6.关闭

    sender.php

    //配置信息
    $conn_args = array(
        'host'     => '127.0.0.1',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/',
    );
    $e_name  = 'e_test'; //交换机名,和服务端一致
    $k_route = 'route_test'; //路由key,和服务端一致
    $q_name = 'q_test'; // 队列名称
    
    // 1.创建连接
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    // 2.在连接内创建一个通道
    $channel = new AMQPChannel($conn);
    
    // 3.创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $ex->setFlags(AMQP_DURABLE); //持久化,不用,则去除
    $ex->declareExchange();
    
    // 4.创建队列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    $q->setFlags(AMQP_DURABLE); // 设置持久化,不用则直接去除
    
    // 5.发送消息
    //$channel->startTransaction(); //开始事务
    for ($i = 0; $i < 5; ++$i) {
        //sleep(1); //休眠1秒
        //消息内容
        $message = $i . " - hi," . date("Y-m-d H:i:s",time());
        echo "Send Message - " . $i . ' : ' . $ex->publish($message, $k_route) . date("Y-m-d H:i:s",time()) . "
    ";
    }
    //$channel->commitTransaction(); //提交事务
    
    // 6.关闭
    $conn->disconnect();

    创建服务端,即消费者

    基本流程:

    1.创建连接
    2.在连接内创建一个通道
    3.创建交换机
    4.设置交换机类型
    5.设置交换机持久
    6.声明交换机并输出状态
    7.创建队列
    8.声明消息队列并输出状态
    9.绑定交换机与队列,并指定路由键
    10.设置消息队列消费者回调方法,并进行阻塞
    11.接收消息并进行处理的回调方法
    12.关闭连接

    receive.php

    //配置信息
    $conn_args = array(
        'host'     => '127.0.0.1',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/',
    );
    $e_name  = 'e_test'; //交换机名 与客户端一致
    $q_name  = 'q_test'; //队列名 与客户端一致
    $k_route = 'route_test'; //路由key 与客户端一致
    
    // 1.创建连接和channel
    $conn = new AMQPConnection($conn_args);
    if (!$conn->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    // 2.在连接内创建一个通道
    $channel = new AMQPChannel($conn);
    
    // 3.创建交换机
    $ex = new AMQPExchange($channel);
    $ex->setName($e_name);
    
    // 4.设置交换机类型
    //AMQP_EX_TYPE_DIRECT:直连交换机
    //AMQP_EX_TYPE_FANOUT:扇形交换机
    //AMQP_EX_TYPE_HEADERS:头交换机
    //AMQP_EX_TYPE_TOPIC:主题交换机
    $ex->setType(AMQP_EX_TYPE_DIRECT);
    // 5.设置交换机持久
    $ex->setFlags(AMQP_DURABLE);
    // 6.声明交换机并输出状态
    echo "Exchange Status:" . $ex->declareExchange() . "
    ";
    
    // 7.创建队列
    $q = new AMQPQueue($channel);
    $q->setName($q_name);
    //设置队列持久
    $q->setFlags(AMQP_DURABLE);
    
    // 8.声明消息队列并输出状态
    echo "Message Total:" . $q->declareQueue() . "
    ";
    
    // 9.绑定交换机与队列,并指定路由键
    echo 'Queue Bind: ' . $q->bind($e_name, $k_route) . "
    ";
    
    // 10.设置消息队列消费者回调方法,并进行阻塞
    echo "waiting reeive Message...
    ";
    
    // 11.接收消息并进行处理的回调方法
    function processMessage($envelope, $queue)
    {
        //取消息内容
        $msg = $envelope->getBody();
        echo $msg . "
    "; //处理消息
        //显式确认,队列收到消费者显式确认后,会删除该消息
        $queue->ack($envelope->getDeliveryTag());
    }
    //设置消息队列消费者回调方法,并进行阻塞
    $q->consume('processMessage');
    //下面是隐式确认,不推荐
    //$q->consume('processMessage', AMQP_AUTOACK);
    
    
    // 12. 关闭连接    
    $conn->disconnect();
  • 相关阅读:
    P2426 删数
    P2115 [USACO14MAR]破坏Sabotage
    P2679 子串
    P2979 [USACO10JAN]奶酪塔Cheese Towers
    P1114 “非常男女”计划
    P2105 K皇后
    P4053 [JSOI2007]建筑抢修
    P1294 高手去散步
    P4316 绿豆蛙的归宿
    P2253 好一个一中腰鼓!
  • 原文地址:https://www.cnblogs.com/lin3615/p/14349044.html
Copyright © 2011-2022 走看看