zoukankan      html  css  js  c++  java
  • PHP实现RabbitMQ消息队列(二)

    1、安装拓展及工具包

    2、代码实现

    a、发布消息

    <?php
    
    $_host = '127.0.0.1';//主机
    $_port = '5672';//端口
    $_login = 'TEST';//账号
    $_password = '123123';//密码
    $_vhost = 'TEST';//host默认为/
    
    require_once '../../../../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    $exchange = 'TEST';//交换机
    $connection = new AMQPStreamConnection($_host, $_port, $_login, $_password, $_vhost);
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', true, false, false);
    
    $routerKey = 'lol_status';
    
    //模拟批量发布消息
    for ($i = 0; $i < 100; $i++) {
        $arr = [
            'match_id' => $i,
            'status' => rand(0,3),
            'time'=> date('Y-m-d H:i:s')
        ];
        $data = json_encode($arr,JSON_UNESCAPED_UNICODE);
        $msg = new AMQPMessage($data);
        $channel->basic_publish($msg, $exchange, $routerKey);
        echo '发送 '.$routerKey.' 消息: ' . $data . PHP_EOL;
    }
    $channel->close();
    $connection->close();
    
    ?>

    b、消费消息

    <?php
    
    $_host = '127.0.0.1';
    $_port = '5672';
    $_login = 'TEST';
    $_password = '123123';
    $_vhost = 'TEST';
    
    require_once '../../../../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $exchange = 'TEST';//交换机名称
    $queueName = 'lol';//队列名称
    $routerKey = 'lol_status'; //路由key
    
    $connection = new AMQPStreamConnection($_host, $_port, $_login, $_password, $_vhost);
    $channel = $connection->channel();
    $channel->exchange_declare($exchange, 'direct', true, false, false);
    $channel->queue_bind($queueName, $exchange, $routerKey);
    
    //获取消息数量信息
    $declare_info = $channel->queue_declare($queueName,true);//$declare_info[1] 消息数量
    
    echo " 等待消息中..." .PHP_EOL;
    $callback = function ($msg){
        echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
        //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//手动应答
    };
    
    //第4个参数  false:手动应答 true:自动应答
    $channel->basic_consume($queueName, '', false, true, false, false, $callback);
    
    //阻塞模式
    while ($channel->is_consuming()) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    
    ?>

     

  • 相关阅读:
    Linux 如何通过命令查看一个文件的某几行(中间几行或最后几行)
    linux 定义变量 ,添加变量值
    Linux下的vi编辑命令中查找·替换详解
    linux vi 块操作、多窗口
    linux 磁盘 分区、格式化、挂载
    linux 重名名、删除文件操作
    linux vi
    linux 文件查阅 cat、more、less、tail
    linux ls touch、umask、 chattr
    linux 目录配置
  • 原文地址:https://www.cnblogs.com/guliang/p/13497093.html
Copyright © 2011-2022 走看看