zoukankan      html  css  js  c++  java
  • PHP使用Redis的List(列表)命令实现消息队列

    使用Redis的List(列表)命令实现消息队列,生产者使用lPush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列

    1.用到的List(列表)命令

    命令作用
    lPush 将一个或多个值插入到列表头部
    rpoplpush 弹出列表最后一个值,同时插入到另一个列表头部,并返回该值
    lRem 删除列表内的给定值
    lIndex 按索引获取列表内的值











    2.队列的组成

    名称职责
    生产者 发布消息
    消费者 获取并处理消息
    监听者 监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理








    3.php实现代码

    生产者Producter.php

    <?php
    try {
        //声明消息队列-list的键名
        $queueKey = 'testQueueKey';
        $redis = new Redis();
        $redis->connect('ip', 6379);
        //向列表中push10条消息
        for ($i = 0;$i < 10;$i++){
            //为消息生成唯一标识
            $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
            $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
            var_dump($ret);
        }
    
    } catch (Exception $e){
        echo $e->getMessage();
    }
    

    消费者Consumer.php

    <?php
    try {
        //声明消息队列-list的键名
        $queueKey = 'testQueueKey';
        //声明监听者队列-list的键名
        $watchQueueKey = 'watchQueueKey';
        $redis = new Redis();
        $redis->connect('ip', 6379);
        //队列先进先出,弹出最先加入的消息,同时放入监听队列
        while (true){
            $ret = $redis->rpoplpush($queueKey, $watchQueueKey);
            if ($ret === false){
                sleep(1);
            } else {
                $retArray = json_decode($ret, true);
                //将唯一id写入缓存设置有效期
                $redis->setex($retArray['uniqid'], 60, 0);
                //模拟失败
                $rand = mt_rand(0,9);
                if ($rand < 3){
                    echo "failure:".$ret."
    ";
                } else {
                    //todo
                    //处理成功移除消息
                    $redis->lRem($watchQueueKey, $ret, 0);
                    echo "success:".$ret."
    ";
                }
            }
        }
    
    } catch (Exception $e){
        echo $e->getMessage();
    }
    

    监听者Watcher.php

    <?php
    try {
        //声明消息队列-list的键名
        $queueKey = 'testQueueKey';
        //声明监听者队列-list的键名
        $watchQueueKey = 'watchQueueKey';
        $redis = new Redis();
        $redis->connect('ip', 6379);
    
        while (true){
            //取出列表尾部的一个值
            $ret = $redis->lIndex($watchQueueKey, -1);
            //如果不存在则休眠1秒
            if ($ret === false){
                sleep(1);
            } else {
                $retArray = json_decode($ret, true);
                $idCache = $redis->get($retArray['uniqid']);
                if ($idCache === false){
                    //如果已过期,表示任务超时,弹回原队列
                    $redis->rpoplpush($watchQueueKey, $queueKey);
                    echo "rpoplpush:".$ret."
    ";
                } else {
                    //处理中,继续等待
                    sleep(1);
                }
            }
        }
    
    } catch (Exception $e){
        echo $e->getMessage();
    }
    

    4.执行队列

    开启监听者php Watcher.php
    开启消费者php Consumer.php
    执行生产者php Producter.php
    生产者输出

    int(1)
    int(2)
    int(3)
    int(4)
    int(5)
    int(6)
    int(7)
    int(8)
    int(9)
    int(10)
    

    监听者输出

    rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
    rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
    rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
    rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
    rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
    rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
    

    消费者输出

    success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
    failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
    success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
    success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
    failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
    failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
    failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
    success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
    success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
    failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
    success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
    success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
    success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
    failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
    success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
    success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
    

    我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行

    转自:https://www.jmsite.cn/blog-615.html

  • 相关阅读:
    mysql增加字段,修改字段,增加索引等语句
    php获取post参数的几种方式
    微信小程序开发注意事项
    jQuery的deferred对象详解
    jquery.pagination.js的使用
    js实现一键复制
    PHP读取文件内容的五种方式
    3.3 模块的搜索路径
    3.2 py文件的两种功能
    3.1 模块的定义与分类,import,from...import
  • 原文地址:https://www.cnblogs.com/WebLinuxStudy/p/12743253.html
Copyright © 2011-2022 走看看