zoukankan      html  css  js  c++  java
  • redis+PHP实现的一个优先级去重队列

    主要思路是用一个set做前端去重缓冲, 若干个list做后端的多优先级消息队列, 用一个进程来进行分发, 即从set中分发消息到队列.

    set缓冲的设计为当天有效, 所以有个零点问题,有可能在零点前set中刚放进去的消息没有分发即失效, 这一点可以用另一个进程弥补处理前一天的遗留消息和删除前一天的缓冲

    <?php
    
    /** 
     * @author 
     * 
     */
    class MsgQuery {
        // TODO - Insert your code here
        const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息缓冲key前缀
        const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
        const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已处理缓冲key前缀
        const SCORE_NUM = 5; // 优先级划分数目
        const MIN_SCORE = 1; // 最小优先级
        static $MAX_SCORE;
        static $instance = null;
        private $redis;
    
        public static function getInstance($redis) {
            if (null == self::$instance) {
                self::$instance = new MsgQuery ( $redis );
            }
            return self::$instance;
        }
    
        /**
         * 添加消息到消息缓冲区
         * @param int $score 优先级(1-5)
         * @param string $msg 消息
         */
        public function add($score, $msg) {
            // 添加到消息缓冲
            $socre = intval ( $score );
            if ($socre < self::MIN_SCORE) {
                $score = self::MIN_SCORE;
            }
            if ($score > self::$MAX_SCORE) {
                $score = self::$MAX_SCORE;
            }
            $cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );
            $cacheData = array (
                    'score' => $score,
                    'msg' => $msg 
            );
            $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
        }
    
        /**
         * 将消息从缓冲区移动到相应的优先级队列中
         */
        public function moveToQuery() {
            // 获取当前缓冲区没有入队列的消息
            $dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');
            $cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');
            $msgs = $this->redis->sDiff($cacheKey, $dealKey);
            foreach ($msgs as $cachedData){
                // 放入已处理集合
                $this->redis->sAdd ( $dealKey, $cachedData );
                // 压入相应的优先级队列
                $cachedData = unserialize($cachedData);
                $score = $cachedData['score'];
                $msg = $cachedData['msg'];
                $queryKey = self::KEY_QUERY_PREFIX.$score;
                $this->redis->rPush($queryKey, $msg);
            }
            unset($cachedData);
        }
        
        /**
         * 从队列阻塞式出栈一个最高优先级消息
         * @return string msg
         */
        public function bPop(){
            $queryKeys = array();
            for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){
                $queryKeys[] = self::KEY_QUERY_PREFIX.$score;
            }
            $msg = $this->redis->blPop($queryKeys, 0);
            return $msg[1];
        }
    
        private function __construct($redis) {
            $this->redis = $redis;
            $this->redis->connect ();
            self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
        }
    
        private function __destruct() {
            $this->redis->close ();
        }
    }
    
    ?>
  • 相关阅读:
    《java异常的一些总结》
    《java小应用程序(Applet)和java应用程序(Application)分别编写的简单计算器》
    《Java应用程序(Application)》
    CPU 内部 MOSFET 晶体管模型
    Intel 4004,世界上第一块 CPU
    VBScript
    VBScript
    Web Scraping(网页抓取)基本原理
    VBScript
    莎士比亚电路ヾ(≧▽≦*)o
  • 原文地址:https://www.cnblogs.com/Moon-Face/p/4814257.html
Copyright © 2011-2022 走看看