zoukankan      html  css  js  c++  java
  • 队列扩展, 支持多个队列

    <?php
    
    /** 
     * @author 魔芋红茶
     * 
     */
    class MsgQuery {
        // TODO - Insert your code here
        private static $KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息缓冲key前缀
        private static $KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
        private static $KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已处理缓冲key前缀
        const SCORE_NUM = 5; // 优先级划分数目
        const MIN_SCORE = 1; // 最小优先级
        private static $MAX_SCORE;
        private static $instance = null;
        private $redis;
        private $curQuery;
        const QUERY_DEFAULT           = 'ms.q.dft';
        const QUERY_ITEM_TASK            = 'ms.q.itm';
        const QUERY_ITEM_SHELVES_ADD  = 'ms.q.shelves_add';  // 上架
        const QUERY_ITEM_SHELVES_MOVE = 'ms.q.shelves_move'; // 下架
        const QUERY_ITEM_IMPORT          = 'ms.q.import';          // 导入
        const QUERY_ITEM_PRICE           = 'ms.q.price';          // 修改售价
        const QUERY_ITEM_INV_QTY      = 'ms.q.inv_qty';      // 修改库存
        const QUERY_ITEM_POINT            = 'ms.q.sell_point';      // 修改卖点
        const QUERY_ITEM_DEL           = 'ms.q.itm.delete';      // 商品删除
        const QUERY_ITEM_RESTORE      = 'ms.q.itm.restore';  // 商品恢复
        const QUERY_ITEM_EDIT           = 'ms.q.itm.edit';      // 商品修改
        const QUERY_ITEM_ADD           = 'ms.q.itm.add';      // 商品添加
        
        private static $querys = array (
            self::QUERY_DEFAULT,
            self::QUERY_ITEM_TASK,
            self::QUERY_ITEM_SHELVES_ADD,
            self::QUERY_ITEM_SHELVES_MOVE,
            self::QUERY_ITEM_IMPORT,
            self::QUERY_ITEM_PRICE,
            self::QUERY_ITEM_INV_QTY,
            self::QUERY_ITEM_POINT,
            self::QUERY_ITEM_DEL,
            self::QUERY_ITEM_RESTORE,
            self::QUERY_ITEM_ADD,
            self::QUERY_ITEM_EDIT
        );
    
        /**
         * 清理已经过期的query数据
         */
        public function clean() {
            $ystDay = date ( 'Ymd', strtotime ( '-1 day' ) );
            $cacheKey = self::$KEY_CACHE_PREFIX . $ystDay;
            $cacheDealKey = self::$KEY_CACHE_DEAL_PREFIX . $ystDay;
            // 清理前尝试移动缓冲区里的残留消息到队列
            $this->moveToQuery ( $ystDay );
            // 删除过期缓冲区
            $this->redis->del ( $cacheKey );
            $this->redis->del ( $cacheDealKey );
        }
    
        public static function getQuerys() {
            return self::$querys;
        }
    
        /**
         * 将当前query切换到指定query
         *
         * @param string $query            
         */
        public function selectQuery($query) {
            if (! in_array ( $query, self::$querys )) {
                $query = self::QUERY_DEFAULT;
            }
            $this->curQuery = $query;
            self::$KEY_CACHE_PREFIX = $query . '.cache';
            self::$KEY_QUERY_PREFIX = $query . '.lv';
            self::$KEY_CACHE_DEAL_PREFIX = $query . '.deal';
        }
    
        /**
         * 获取一个query实例
         * @param redis $redis
         * @param string $query
         * @return MsgQuery
         */
        public static function getInstance($redis, $query) {
            if (null == self::$instance) {
                self::$instance = new MsgQuery ( $redis, $query );
            }
            if (self::$instance->curQuery != $query) {
                self::$instance->selectQuery ( $query );
            }
            return self::$instance;
        }
    
        /**
         * 添加消息到消息缓冲区
         *
         * @param int $score
         *            优先级(1-5)
         * @param string $msg
         *            消息
         */
        public function add($score, $msg) {
            // 添加到消息缓冲
            $socre = intval ( $score );
            if ($socre < self::MIN_SCORE) {
                $score = self::MIN_SCORE;
            }
            if ($score > self::$MAX_SCORE) {
                $score = self::$MAX_SCORE;
            }
            $cacheKey = self::$KEY_CACHE_PREFIX . date ( 'Ymd' );
            $cacheData = array (
                    'score' => $score,
                    'msg' => $msg 
            );
            
            // 被添加到集合中的新元素的数量,不包括被忽略的元素,故重复添加返回false
            return $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
        }
    
        /**
         * 将消息从缓冲区移动到相应的优先级队列中
         */
        public function moveToQuery($day = null) {
            if ($day === null) {
                $day = date ( 'Ymd' );
            }
            // 获取当前缓冲区没有入队列的消息
            $dealKey = self::$KEY_CACHE_DEAL_PREFIX . $day;
            $cacheKey = self::$KEY_CACHE_PREFIX . $day;
            $msgs = $this->redis->sDiff ( $cacheKey, $dealKey );
            foreach ( $msgs as $cachedData ) {
                // 放入已处理集合
                $this->redis->sAdd ( $dealKey, $cachedData );
                // 压入相应的优先级队列
                $cachedData = unserialize ( $cachedData );
                $score = $cachedData ['score'];
                $msg = $cachedData ['msg'];
                $queryKey = self::$KEY_QUERY_PREFIX . $score;
                $this->redis->rPush ( $queryKey, serialize($msg) );
            }
            unset ( $cachedData );
        }
    
        /**
         * 从队列阻塞式出栈一个最高优先级消息
         *
         * @return string msg
         */
        public function bPop() {
            $queryKeys = array ();
            for($score = self::$MAX_SCORE; $score >= self::MIN_SCORE; $score --) {
                $queryKeys [] = self::$KEY_QUERY_PREFIX . $score;
            }
            $msg = @$this->redis->blPop ( $queryKeys, 0 );
            return $msg [1];
        }
    
        private function __construct($redis, $query) {
            $this->redis = $redis;
    //         $this->redis->connect ();
            self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
            $this->selectQuery ( $query );
        }
    
        public function __destruct() {
            if ($this->redis)     $this->redis->close ();
        }
    }
    
    ?>

    主体没有任何改变, 依然是2个set做缓冲, 多个list做实际队列的实现, 只是扩展为支持多个队列的实现.

    其它队列使用中遇到过的问题:

    1 feed程序因为长时间阻塞而断开了和redis的连接

    解决方法: 加入代码

    ini_set('default_socket_timeout', -1);

    以上代码可以让php进程和redis通过socket长连接

    2 feed程序因为长时间阻塞而断开了和mysql的连接

    解决方法: 查看mysql驱动使用的是哪种, 我们项目中用的是mysqli连接, 具体驱动是mysqlnd, 如果是mysqld, 最简单的方式是先设置php变量mysqli.reconnect=1再加入ping命令

    但有个问题是mysqlnd驱动下ping命令并不能自动重连, 即使改了php变量也没用, 只能显示进行重连, 如下

            if(!$this->connection->ping()){
                //mysql 连接丢失且mysqlnd不会自动重连, 手动重连
                $this->connectDB();
            }

    3 重新发布/变更了feed程序可能需要重启进程加载新代码

    最后附上一个队列分发和清理任务脚本, 脚本中多进程分发的代码存在问题, 会产生很多僵尸进程, 目前仅使用单进程版本, 处理时间为每天凌晨调用clean脚本清理前一天的队列, 分发程序为每5分钟执行一次, 具体执行间隔视情况调整

    <?php
    class ControllerMsgQuery extends Controller {
    
        /**
         * 处理队列缓冲区的数据, 移动到队列
         */
        public function index() {
            //多进程版本存在问题, 先使用单进程版本
            $this->dealWithSingleProcess();
        }
        
        private function dealWithMuitlProcess(){
            set_time_limit ( 0 );
            global $global;
            $this->load->ventor ( 'msg_query/MsgQuery' );
            $querys = MsgQuery::getQuerys ();
            $redis = new Redis ();
            $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] );
            foreach ( $querys as $query ) {
                // 每个query用独立子线程完成move to list的工作
                $pid = pcntl_fork ();
                if ($pid > 0) {
                    // 父进程
                } else if ($pid == 0) {
                    // 子进程
                    $MsgQuery = MsgQuery::getInstance ( $redis, $query );
                    $MsgQuery->moveToQuery ();
                    exit();
                } else {
                    exit();
                }
            }
            // 父进程等待子进程以释放资源
            //         if ($pid > 0){
            pcntl_wait ( $status );
            //         }
        }
        
        private function dealWithSingleProcess(){
            set_time_limit ( 0 );
            global $global;
            $this->load->ventor ( 'msg_query/MsgQuery' );
            $querys = MsgQuery::getQuerys ();
            $redis = new Redis ();
            $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] );
            foreach ( $querys as $query ) {
                $MsgQuery = MsgQuery::getInstance ( $redis, $query );
                $MsgQuery->moveToQuery ();
            }
        }
    
        /**
         * 清理已经过期的query数据
         */
        public function clean() {
            set_time_limit ( 0 );
            global $global;
            $this->load->ventor ( 'msg_query/MsgQuery' );
            $querys = MsgQuery::getQuerys ();
            $redis = new Redis ();
            $redis->connect ( $global['redis_cache_w'][0]['host'], $global['redis_cache_w'][0]['port'] );
            foreach ( $querys as $query ) {
                $MsgQuery = MsgQuery::getInstance ( $redis, $query );
                $MsgQuery->clean ();
            }
        }
    }
  • 相关阅读:
    网络编程TCP
    collections模块
    异常处理
    hashlib模块
    configparse模块
    logging模块
    序列化模块
    os模块
    时间模块
    random模块
  • 原文地址:https://www.cnblogs.com/Moon-Face/p/4923677.html
Copyright © 2011-2022 走看看