zoukankan      html  css  js  c++  java
  • redis+PHP实现的一个优先级去重队列

    主要思路是用一个set做前端去重缓冲, 若干个list做后端的多优先级消息队列, 用一个进程来进行分发, 即从set中分发消息到队列.

    set缓冲的设计为当天有效, 所以有个零点问题,有可能在零点前set中刚放进去的消息没有分发即失效, 这一点可以用另一个进程弥补处理前一天的遗留消息和删除前一天的缓冲

    <?php
    
    /** 
     * @author 
     * 
     */
    class MsgQuery {
        // TODO - Insert your code here
        const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息缓冲key前缀
        const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
        const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已处理缓冲key前缀
        const SCORE_NUM = 5; // 优先级划分数目
        const MIN_SCORE = 1; // 最小优先级
        static $MAX_SCORE;
        static $instance = null;
        private $redis;
    
        public static function getInstance($redis) {
            if (null == self::$instance) {
                self::$instance = new MsgQuery ( $redis );
            }
            return self::$instance;
        }
    
        /**
         * 添加消息到消息缓冲区
         * @param int $score 优先级(1-5)
         * @param string $msg 消息
         */
        public function add($score, $msg) {
            // 添加到消息缓冲
            $socre = intval ( $score );
            if ($socre < self::MIN_SCORE) {
                $score = self::MIN_SCORE;
            }
            if ($score > self::$MAX_SCORE) {
                $score = self::$MAX_SCORE;
            }
            $cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );
            $cacheData = array (
                    'score' => $score,
                    'msg' => $msg 
            );
            $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
        }
    
        /**
         * 将消息从缓冲区移动到相应的优先级队列中
         */
        public function moveToQuery() {
            // 获取当前缓冲区没有入队列的消息
            $dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');
            $cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');
            $msgs = $this->redis->sDiff($cacheKey, $dealKey);
            foreach ($msgs as $cachedData){
                // 放入已处理集合
                $this->redis->sAdd ( $dealKey, $cachedData );
                // 压入相应的优先级队列
                $cachedData = unserialize($cachedData);
                $score = $cachedData['score'];
                $msg = $cachedData['msg'];
                $queryKey = self::KEY_QUERY_PREFIX.$score;
                $this->redis->rPush($queryKey, $msg);
            }
            unset($cachedData);
        }
        
        /**
         * 从队列阻塞式出栈一个最高优先级消息
         * @return string msg
         */
        public function bPop(){
            $queryKeys = array();
            for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){
                $queryKeys[] = self::KEY_QUERY_PREFIX.$score;
            }
            $msg = $this->redis->blPop($queryKeys, 0);
            return $msg[1];
        }
    
        private function __construct($redis) {
            $this->redis = $redis;
            $this->redis->connect ();
            self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
        }
    
        private function __destruct() {
            $this->redis->close ();
        }
    }
    
    ?>
  • 相关阅读:
    JSP中 == 和equals的区别
    使用Cookie保存用户名密码,再次登陆时将Cookie用户名密码取出来并直接放置到用户名密码文本框中
    学习Java Web开发中遇到的问题,及其解决方法
    部署、测试、服务工作的经验记录
    Python基础--dict字典操作
    Python基础--dict字典
    Python基础--预留空 5
    Python基础--预留空 4
    Python基础--tuple 元组
    Python基础--预留3
  • 原文地址:https://www.cnblogs.com/Moon-Face/p/4814257.html
Copyright © 2011-2022 走看看