zoukankan      html  css  js  c++  java
  • PHP + Redis 实现消息队列

    Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析、秒杀计数器、缓存等

    Redis做消息队列待解决的问题:

      1、消息的可靠性: 没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理。生产者只管向队列中插入数据,不管消费者是否成功消费。

      2、消费者挂掉消息不会丢失,但是需要重新触发一下消费者,才能够继续消费消息。

    代码如下:

      lib.php 是工具文件,里面有数据库的连接、Redis的连接:

    <?php
    /**
     * 获取数据库连接
     *
     * @param $host
     * @param $username
     * @param $password
     * @param $database
     * @return mysqli
     */
    function getDBConnection($host, $username, $password, $database){
        $connection = new mysqli('p:'.$host, $username, $password, $database);
        if (!$connection) {
            echo "Error: Unable to connect to MySQL." . PHP_EOL;
            echo "Debugging errno: " . mysqli_connect_errno() . PHP_EOL;
            echo "Debugging error: " . mysqli_connect_error() . PHP_EOL;
            exit;
        }
        mysqli_query($connection, "set names 'utf8'");
        return $connection;
    }
    
    /**
     * 获取Redis连接
     *
     * @param $host
     * @param $port
     * @param string $password
     * @param int $database
     * @return Redis
     */
    function getRedis($host='127.0.0.1', $port='6379', $password=null, $database=0){
        $redis = new Redis();
        if(!$redis->connect($host, $port)){
            die("Redis连接失败:IP或端口有误");
        }
        if(!empty($password) && !$redis->auth($password)){
            die("Redis连接失败:密码错误");
        }
        if($database){
            $redis->select($database);
        }
        // work中 subscribe 如果一段时间没有接到消息,就会停掉然后停掉,所以加这个语句让其永不超时
        $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
        return $redis;
    }
    
    /**
     * 打印消息日志
     *
     * @param $msg
     */
    function stdout($msg=null){
        $msg = '['.date('Y-m-d H:i:s').']'.$msg.chr(10);;
        fwrite(STDOUT, $msg);
    }

     

     register.php 是消息发布者,注释的是将消息存入数据库部分的代码。

      首先想消息存入 register_users 队列中,存入的 key是register_users;value是一个list,消息全部存入其中。用 redis-cli 查看数据的命令是:

    LRANGE register_users 0 -1
    

      register.php:

    <?php
    require './lib.php';
    $name = $argv[1];
    $mobile = $argv[2];
    if(empty($name) || empty($mobile)){
        die("参数错误");
    }
    // $connection = getDBConnection('localhost:3306', 'root', 'root', 'blog');
    // // 开启事务
    // mysqli_begin_transaction($connection);
    // $sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
    // if(!mysqli_query($connection, $sql)){
    //     die("写入用户信息失败,原因:".$connection->error);
    // }
    $redis = getRedis();
    // 添加消息
    $result = $redis->lpush('register_users', json_encode(array('name'=>$name, 'mobile'=>$mobile), JSON_UNESCAPED_UNICODE));
    if($result === false){
        mysqli_rollback($connection);
        die("添加消息队列失败");
    }
    // 发布消息
    $redis->publish('register_success', 'ok');
    // 所有操作完成后提交事务
    // mysqli_commit($connection);
    // $connection->close();
    $redis->close();
    

      

      work.php 做为消息的消费者

    <?php
    require './lib.php';
    $redis = getRedis();
    $redis->subscribe(['register_success'], function ($instance, $channelName, $message) {
        if($channelName == "register_success" && $message = "ok") {
            $redis = getRedis();
            while($redis->lsize("register_users")>0) {
                $arr = $redis->brPop(['register_users'], 20);
                if(count($arr)) {
                    $userInfo = json_decode($arr[1], true);
                    stdout("新注册用户信息:");
                    stdout("姓名:".$userInfo['name']);
                    stdout("手机号:".$userInfo['mobile']);
                    stdout();
                    sleep(3);
                }
            }
        }
    });
    

      

      register.php将消息放入redis 的 register_users队列中,然后再使用 publish 将 register_success 消息发不出去。work.php 使用 subscribe 订阅 register_success 的消息。接收到 register_success 消息之后,读取 register_users 的消息进行处理。

    我是按照这个github上面的代码做的参照,有些改动: 

      https://github.com/jormin/php-redis-mq

  • 相关阅读:
    bzoj 2152: 聪聪可可 树的点分治
    Contest 20141027 总结
    bzoj 3505: [Cqoi2014]数三角形 组合数学
    bzoj 3624: [Apio2008]免费道路 生成树的构造
    tyvj P1075
    poj 2778 DNA Sequence AC自动机
    poj 2778 DNA Sequence AC自动机DP 矩阵优化
    bzoj 3626: [LNOI2014]LCA 离线+树链剖分
    BZOJ 1412: [ZJOI2009]狼和羊的故事【网络流】
    ACDream:1210:Chinese Girls' Amusement【水题】
  • 原文地址:https://www.cnblogs.com/Lyh1997/p/11491046.html
Copyright © 2011-2022 走看看