zoukankan      html  css  js  c++  java
  • PHP实现RabbitMQ消息队列(二)

    1、安装拓展及工具包

    2、代码实现

    a、发布消息

    <?php
    
    $_host = '127.0.0.1';//主机
    $_port = '5672';//端口
    $_login = 'TEST';//账号
    $_password = '123123';//密码
    $_vhost = 'TEST';//host默认为/
    
    require_once '../../../../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $exchange = 'TEST';//交换机
    $connection = new AMQPStreamConnection($_host, $_port, $_login, $_password, $_vhost);
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', true, false, false);
    
    $routerKey = 'lol_status';
    
    //模拟批量发布消息
    for ($i = 0; $i < 100; $i++) {
        $arr = [
            'match_id' => $i,
            'status' => rand(0,3),
            'time'=> date('Y-m-d H:i:s')
        ];
        $data = json_encode($arr,JSON_UNESCAPED_UNICODE);
        $msg = new AMQPMessage($data);
        $channel->basic_publish($msg, $exchange, $routerKey);
        echo '发送 '.$routerKey.' 消息: ' . $data . PHP_EOL;
    }
    $channel->close();
    $connection->close();
    
    ?>

    b、消费消息

    <?php
    
    $_host = '127.0.0.1';
    $_port = '5672';
    $_login = 'TEST';
    $_password = '123123';
    $_vhost = 'TEST';
    
    require_once '../../../../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $exchange = 'TEST';//交换机名称
    $queueName = 'lol';//队列名称
    $routerKey = 'lol_status'; //路由key
    
    $connection = new AMQPStreamConnection($_host, $_port, $_login, $_password, $_vhost);
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', true, false, false);
    $channel->queue_bind($queueName, $exchange, $routerKey);
    
    //获取消息数量信息
    $declare_info = $channel->queue_declare($queueName,true);//$declare_info[1] 消息数量
    
    echo " 等待消息中..." .PHP_EOL;
    $callback = function ($msg){
        echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
        //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//手动应答
    };
    
    //第4个参数  false:手动应答 true:自动应答
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    //阻塞模式
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    
    ?>

     

  • 相关阅读:
    ABAP常用函数归纳
    abap 优化之ST05
    对统驭科目和特别总账标志的理解
    会计凭证修改函数的使用
    会计凭证替代 OBBH
    屏幕切换
    se37 函数中的异常使用
    清帐函数的使用
    使用Servlet和JSp在浏览器上实现对数据库表的增删改查(新手)
    Java中的Xml配置文件(新手)
  • 原文地址:https://www.cnblogs.com/guliang/p/13497093.html
Copyright © 2011-2022 走看看