zoukankan      html  css  js  c++  java
  • 基于的消息队列实现-轻量级-redis实现

    一,适用于场景,异步,回调,非实时性业务场景,支持队列数据失败重传,多进程,实时延时队列,重试队列,死信队列,并针对队列数据进行监控。

    二,相关概念

      QUEUE模块:消息队列push pop模块。

      PHP进程: 针对队列数据起的PHP多个进程

      文件锁:用来控制进程的启动与重启

      REDIS队列:数据载体,redis缓存list

    三,环境

      tp3(框架比较旧),tp5貌似已经有toptink-queue的模块可以直接用

    三,实现

      QUEUE.php

      

      1 <?php
      2 namespace Think;
      3 class Queue {
      4     //正常工作队列
      5     CONST JOBTYPE = 1;
      6     //延迟队列
      7     CONST DELAYTYPE = 2;
      8     //失败重试队列
      9     CONST RETRYTYPE = 3;
     10     //死信队列
     11     CONST DIETYPE = 4;
     12     //工作队列数
     13     public static $_listnums = 3;
     14     //每次队列取元素个数
     15     private static $_listpushnums = 10; //每个队列出队数
     16     //工作队列key+队列ID
     17     private static $_listkey  = 'test:queue:';
     18     //延迟队列
     19     private static $_delaylistkey = 'test:delay:queue:';
     20     //重试队列key
     21     private static $_retrylistkey = 'test:retry:queue:retry';
     22     //死信队列key
     23     private static $_dielistkey = 'test:die:queue:';
     24 
     25     //延时对列
     26     private static $DelayedEvent = array('');
     27     
     28     //redis对象
     29     private $redis = null;
     30     
     31     //工厂实例
     32     protected static $_instance = array();
     33     
     34     //支持不同redis实例调用
     35     public static function factory($objredis){
     36         if(!is_object($objredis)){
     37             return false;
     38         }
     39         $md5 = md5(serialize($objredis));
     40         if(!is_object(self::$_instance['queue'][$md5])){
     41             self::$_instance['queue'][$md5] = new self($objredis);
     42         }
     43         return self::$_instance['queue'][$md5];
     44     }
     45     
     46     
     47     //构造函数
     48     public function __construct($objredis){
     49         $this->redis = $objredis;
     50         $this->env = ENV;
     51     }
     52 
     53     /**
     54      * 随机工作队列
     55      * @return [type] [description]
     56      */
     57     public function getListKey($process=0){
     58         $process = $process ? $process : mt_rand(1,self::$_listnums);
     59         return self::$_listkey.$this->env.$process;
     60     }
     61 
     62     /**
     63      * 延迟队列
     64      * @param  [type] $env [description]
     65      * @return [type]      [description]
     66      */
     67     public function getDelayListKey(){
     68         return self::$_delaylistkey.$this->env;
     69     }
     70 
     71     /**
     72      * 获取重试队列
     73      * @return [type] [description]
     74      */
     75     public function getRetrylistkey(){
     76         return self::$_retrylistkey.$this->env;
     77     }
     78 
     79     /**
     80      * 获取死信队列
     81      * @return [type] [description]
     82      */
     83     public function getDielistkey(){
     84         return self::$_dielistkey.$this->env;
     85     }
     86 
     87     /**
     88      * 入队
     89      * @param  [type] $callback [定义消息回调函数]
     90      * @param  [type] $aMessage [定义消息体]
     91      * @param  [type] $aMessage [写入事件类型]
     92      * @param  [type] $aMessage [是否备份文件]
     93      * @return [type]           [true/false]
     94      */
     95     public function push($callback,$aMessage,$type=1,$wfile=true){
     96         if( !is_array($aMessage) || empty($aMessage) || !in_array($type,array(self::JOBTYPE,self::RETRYTYPE,self::DIETYPE))) return false;
     97         
     98         //延迟队列
     99         $type = ($type == self::JOBTYPE && in_array($callback,self::$DelayedEvent)) ? self::DELAYTYPE : $type;
    100 
    101         switch ($type) {
    102             case self::JOBTYPE:
    103                 $listkey = $this->getListKey();
    104                 break;
    105             case self::DELAYTYPE:
    106                 $listkey = $this->getDelayListKey();
    107                 break;
    108             case self::RETRYTYPE:
    109                 $listkey = $this->getRetrylistkey();
    110                 break;
    111             case self::DIETYPE:
    112                 $listkey = $this->getDielistkey();
    113                 break;
    114             default:
    115                 return false;
    116                 break;
    117         }
    118         
    119         //备份消息体
    120         $wfile && error_log($type."|".date('Y-m-d H:i:s')."|".$callback."|".json_encode($aMessage)."
    ",3,LOG_PATH."Crontab/job/queue_".$this->env.".log");
    121         
    122         //入队
    123         $this->rPush($listkey, json_encode( array($callback,$aMessage) ) );
    124         return true;
    125     }
    126 
    127     /**
    128      * 按队列类型取数据
    129      * @param [type] $type [获取队列类型]
    130      */
    131     public function PopEvent($type){
    132         $result = array();
    133         $type = intval($type);
    134         switch ($type) {
    135             case self::JOBTYPE://工作队列
    136                 for($listid=1;$listid <= self::$_listnums;$listid++){
    137                     $temp = $this->lGetRange($this->getListKey($listid),0,self::$_listpushnums-1);
    138                     if( empty($temp) || !is_array($temp))
    139                         continue;
    140                     $result = $result ? array_merge($result,$temp): $temp;
    141                 }
    142                 break;
    143             case self::DELAYTYPE: //delay queue
    144                 $result = $this->lGetRange($this->getDelayListKey(),0,self::$_listpushnums-1);
    145                 break;
    146             case self::RETRYTYPE: //retry queue
    147                 $result = $this->lGetRange($this->getRetrylistkey(),0,self::$_listpushnums-1);
    148                 break;
    149             case self::DIETYPE: //die queue 
    150                 $result = $this->lGetRange($this->getDielistkey(),0,self::$_listpushnums-1);
    151                 break;        
    152         }
    153         return $result;
    154     }
    155 
    156     /**
    157      * 向队列尾部添加一条消息
    158      * @param  [type] $listkey [description]
    159      * @param  [type] $data    [description]
    160      * @return [type]          [description]
    161      */
    162     public function rPush($listkey,$data){
    163         $this->redis->rPush($listkey, $data);
    164     }
    165 
    166     /**
    167      * 取出指定队列头部一条消息
    168      * @param  [type] $listid [description]
    169      * @return [type]         [description]
    170      */
    171     public function lPop($listkey){
    172         return $this->redis->lPop($listkey);
    173     }
    174 
    175     /**
    176      * 弹出队列中一段
    177      * @param  [type] $listkey [description]
    178      * @param  [type] $start   [description]
    179      * @param  [type] $end     [description]
    180      * @return [type]          [description]
    181      */
    182     public function lGetRange($listkey,$start,$end){
    183         $return = $this->redis->lGetRange($listkey,$start,$end);
    184         $return && $this->redis->lTrim($listkey,$end+1,-1);
    185         return $return;
    186     }
    187 
    188     /**
    189      * 删除指定队列
    190      * @param  [type] $listkey [description]
    191      * @return [type]          [description]
    192      */
    193     public function deleteList($listkey){
    194         return $this->redis->delete($listkey);
    195     }
    196 
    197     /**
    198      * 获得队列状态,即目前队列中的消息数量
    199      * @return mixed
    200      */
    201     public function size($listkey){
    202         return $this->redis->lSize($listkey);
    203     }
    204 
    205     /**
    206      * 获取链接信息
    207      * @return [type] [description]
    208      */
    209     public function ping(){
    210         return $this->redis->ping();
    211     }
    212 
    213     /**
    214      * 关闭链接
    215      */
    216     public function close(){
    217         $this->redis->close();
    218     }
    219     /**
    220      * 统计当前消息队列待处理消息数量
    221      * @return [type] [description]
    222      */
    223     public function getCountSize(){
    224         $aRet = array();
    225         for($listid = 1;$listid <= self::$_listnums;$listid++){
    226             $aRet[self::JOBTYPE][$listid] = (int)$this->size($this->getListKey($listid));
    227         }
    228         $aRet[self::DELAYTYPE][1] = (int)$this->size($this->getDelayListKey());
    229         $aRet[self::RETRYTYPE][1] = (int)$this->size($this->getRetrylistkey());
    230         $aRet[self::DIETYPE][1]   = (int)$this->size($this->getDielistkey());
    231         return $aRet;
    232     }
    233 
    234     /**
    235      * 清空消息队列
    236      * @return [type] [description]
    237      */
    238     public function clearList($type){
    239         switch ($type) {
    240             case self::JOBTYPE:
    241                 for($listid = 1;$listid <= self::$_listnums;$listid++){
    242                     $this->deleteList($this->getListKey($listid));
    243                 }
    244                 break;
    245             case self::DELAYTYPE:
    246                 $this->deleteList($this->getDelayListKey());
    247                 break;
    248             case self::RETRYTYPE:
    249                 $this->deleteList($this->getRetrylistkey());
    250                 break;
    251             case self::DIETYPE:
    252                 $this->deleteList($this->getDielistkey());
    253                 break;
    254         }
    255         return true;
    256     }
    257 
    258     /**
    259      * 获得统计key
    260      * @param  [type] $modename [description]
    261      */
    262     public function getCountKey($modename){
    263         return $this->env . ":queue:count:{$modename}";
    264     }
    265 
    266     /**
    267      * 记录一些任务统计日志
    268      * @param [type] $modename [description]
    269      * @param [type] $runid    [description]
    270      * @param [type] $strLogs  [description]
    271      */
    272     public function addLog($modename,$runid,$strLogs){
    273         $hashkey = $this->getCountKey($modename);
    274         return $this->redis->hSet($hashkey,$runid,$strLogs);
    275     }
    276 
    277     /**
    278      * 统计任务
    279      * @param  [type] $modename [description]
    280      * @return [type]           [description]
    281      */
    282     public function info($modename){
    283         $hashkey = $this->getCountKey($modename,$this->env);
    284         return $this->redis->hGetAll($hashkey);
    285     }
    286     
    287     
    288     public function __destruct(){
    289         $this->close();
    290     }
    291 
    292 }

      二,文件lock模块

        其实就是利于读写文件及touch操作

        

      1 <?php 
      2 namespace Think;
      3 /**
      4  * 文件锁管理
      5  */
      6 class Lock{
      7 
      8     CONST TYPERUN = 'run';
      9     CONST TYPEDIE = 'die';
     10     CONST TYPELOG = 'log';
     11     static private $pathlock = '';
     12     protected static $_instance = array();
     13     
     14     public static function factory($path = LOG_PATH){
     15         $obmd = md5($path);
     16         if(!is_object(self::$_instance['Lock'][$obmd])){
     17             self::$_instance['Lock'][$obmd] = new self($path);
     18         }
     19         return self::$_instance['Lock'][$obmd];
     20     }
     21     
     22     public function __construct($path){
     23         $this->pathlock = $path."runlock/";
     24         !is_dir($this->pathlock) && mkdir($this->pathlock,0777);
     25     }
     26 
     27     /**
     28      * 获得锁文件
     29      * @param  [type] $name [锁昵称]
     30      * @param  [type] $type [锁类型] 1运行锁 2重启锁
     31      * @param  [type] $pid  [锁id]
     32      * @return [type]       [description]
     33      */
     34     public function getLockName($name,$type,$runid=1){
     35         return "{$name}_{$runid}.{$type}";
     36     }
     37 
     38     /**
     39      * [getLockPath description]
     40      * @param  [type] $name [description]
     41      * @param  [type] $type [description]
     42      * @return [type]       [description]
     43      */
     44     public function getLockPath($name,$type,$runid=1){
     45         $filename = $this->getLockName($name,$type,$runid);
     46         return $this->pathlock.$filename;
     47     }
     48 
     49     /**
     50      * 获取锁文件最后更新时间和锁对应的进程pid
     51      * @param  [type]  $name  [description]
     52      * @param  [type]  $type  [description]
     53      * @param  integer $runid [description]
     54      * @return [type]         [description]
     55      */
     56     public function getLock($name,$type,$runid=1){
     57         $aresult = array();
     58         $filename = $this->getLockPath($name,$type,$runid);
     59         if( !is_file($filename) ){
     60             return $aresult;
     61         }
     62         $aresult['time'] = filemtime($filename);
     63         $aresult['pid']  = file_get_contents($filename);
     64         return $aresult;
     65     }
     66 
     67     /**
     68      * 更新锁文件时间
     69      * @param  [type]  $name  [description]
     70      * @param  [type]  $type  [description]
     71      * @param  integer $runid [description]
     72      * @return [type]         [description]
     73      */
     74     public function touchLock($name,$type,$runid=1){
     75         return touch( $this->getLockPath($name,$type,$runid) );
     76     }
     77 
     78     /**
     79      * 清除指定锁文件
     80      * @param  [type]  $name  [description]
     81      * @param  [type]  $type  [description]
     82      * @param  integer $runid [description]
     83      * @return [type]         [description]
     84      */
     85     public function deleteLock($name,$type,$runid=1){
     86         return unlink($this->getLockPath($name,$type,$runid));
     87     }
     88 
     89     /**
     90      * 添加锁文件
     91      * @param [type]  $name     [description]
     92      * @param [type]  $type     [description]
     93      * @param integer $runid    [description]
     94      * @param [type]  $strvalue [description]
     95      */
     96     public function addLock($name,$type,$runid=1,$strvalue=0){
     97         $filename = $this->getLockPath($name,$type,$runid);
     98         return file_put_contents($filename,$strvalue);
     99     }
    100 
    101 }
      1 <?php 
      2 namespace Think;
      3 /**
      4  * 文件锁管理
      5  *
      6  * author wangyanjun 20200513
      7  *
      8  */
      9 class Lock{
     10 
     11     CONST TYPERUN = 'run';
     12     CONST TYPEDIE = 'die';
     13     CONST TYPELOG = 'log';
     14     static private $pathlock = '';
     15     protected static $_instance = array();
     16     
     17     public static function factory($path = LOG_PATH){
     18         $obmd = md5($path);
     19         if(!is_object(self::$_instance['Lock'][$obmd])){
     20             self::$_instance['Lock'][$obmd] = new self($path);
     21         }
     22         return self::$_instance['Lock'][$obmd];
     23     }
     24     
     25     public function __construct($path){
     26         $this->pathlock = $path."runlock/";
     27         !is_dir($this->pathlock) && mkdir($this->pathlock,0777);
     28     }
     29 
     30     /**
     31      * 获得锁文件
     32      * @param  [type] $name [锁昵称]
     33      * @param  [type] $type [锁类型] 1运行锁 2重启锁
     34      * @param  [type] $pid  [锁id]
     35      * @return [type]       [description]
     36      */
     37     public function getLockName($name,$type,$runid=1){
     38         return "{$name}_{$runid}.{$type}";
     39     }
     40 
     41     /**
     42      * [getLockPath description]
     43      * @param  [type] $name [description]
     44      * @param  [type] $type [description]
     45      * @return [type]       [description]
     46      */
     47     public function getLockPath($name,$type,$runid=1){
     48         $filename = $this->getLockName($name,$type,$runid);
     49         return $this->pathlock.$filename;
     50     }
     51 
     52     /**
     53      * 获取锁文件最后更新时间和锁对应的进程pid
     54      * @param  [type]  $name  [description]
     55      * @param  [type]  $type  [description]
     56      * @param  integer $runid [description]
     57      * @return [type]         [description]
     58      */
     59     public function getLock($name,$type,$runid=1){
     60         $aresult = array();
     61         $filename = $this->getLockPath($name,$type,$runid);
     62         if( !is_file($filename) ){
     63             return $aresult;
     64         }
     65         $aresult['time'] = filemtime($filename);
     66         $aresult['pid']  = file_get_contents($filename);
     67         return $aresult;
     68     }
     69 
     70     /**
     71      * 更新锁文件时间
     72      * @param  [type]  $name  [description]
     73      * @param  [type]  $type  [description]
     74      * @param  integer $runid [description]
     75      * @return [type]         [description]
     76      */
     77     public function touchLock($name,$type,$runid=1){
     78         return touch( $this->getLockPath($name,$type,$runid) );
     79     }
     80 
     81     /**
     82      * 清除指定锁文件
     83      * @param  [type]  $name  [description]
     84      * @param  [type]  $type  [description]
     85      * @param  integer $runid [description]
     86      * @return [type]         [description]
     87      */
     88     public function deleteLock($name,$type,$runid=1){
     89         return unlink($this->getLockPath($name,$type,$runid));
     90     }
     91 
     92     /**
     93      * 添加锁文件
     94      * @param [type]  $name     [description]
     95      * @param [type]  $type     [description]
     96      * @param integer $runid    [description]
     97      * @param [type]  $strvalue [description]
     98      */
     99     public function addLock($name,$type,$runid=1,$strvalue=0){
    100         $filename = $this->getLockPath($name,$type,$runid);
    101         return file_put_contents($filename,$strvalue);
    102     }
    103 
    104 三,job模块,队列进程模块

        JobController.class.php

      1 <?php 
     11 class JobController {
     12     
     13     //crontab start
     14     public function run(){
     15         $runid = $_SERVER['argv']['2'] ? $_SERVER['argv']['2'] : 1;
     16         list($grunid,$listtype) = explode('_', $runid);
     17         //进程组
     18         $grunid = $grunid ? intval($grunid) : 1;
     19         //$listtype 队列类型  不指定队列类型则一个进程处理所有类型的队列
     20         $listtype  = $listtype ? $listtype : 0;
     21 
     22         //jobController
     23         $filename = basename(__FILE__,".class.php");
     24         $aCountLog = array();
     25         
     26         $locktyperun = Lock::TYPERUN;
     27         $locktypedie = Lock::TYPEDIE;
     28         $locktypelog = Lock::TYPELOG;
     29 
     30         $jobtype   = Queue::JOBTYPE;
     31         $delaytype = Queue::DELAYTYPE;
     32         $retrytype = Queue::RETRYTYPE;
     33         $dietype   = Queue::DIETYPE;
     34 
     35         if( !in_array($listtype, array(0,$jobtype,$delaytype,$retrytype) ) ){
     36             die('type fail!');
     37         }
     38 
     39         //判断是否已运行
     40         clearstatcache(); //清除PHP文件信息缓存
     41         $lockValue = Lock::factory()->getLock($filename,$locktyperun,$runid);
     42         if( $lockValue ) {
     43             if((time() - $lockValue['time']) < 300) {
     44                 die();
     45             } else {
     46                 $pid = $lockValue['pid'];
     47                 shell_exec("ps aux | grep 'cli.php' | grep 'Crontab/Job/run' | grep -v 'grep' | awk '{print $2}' | grep {$pid} | xargs --no-run-if-empty kill");
     48             }
     49         }
     50 
     51         //设置运行状态
     52         if(!Lock::factory()->addLock($filename,$locktyperun,$runid,getmypid()) ) {
     53             die();
     54         }
     55         
     56         //声明queue对象
     57         $objredis = Apps::redis('cms');
     58         $objQueue = Queue::factory($objredis);
     59         
     60         while ( true ) {
     61             //更新锁文件时间
     62             Lock::factory()->touchLock($filename,$locktyperun,$runid);
     63             
     64             $aValues = array();
     65             $objQueue->ping();
     66         
     67 
     68             if( $listtype > 0){    //分进程处理不同类型队列
     69                 $aValues[$listtype] = $objQueue->PopEvent($listtype);
     70             } else {             //全部在一个进程处理
     71                 $aValues[$jobtype]   = $objQueue->PopEvent($jobtype);
     72                 $aValues[$delaytype] = $objQueue->PopEvent($delaytype);
     73                 $aValues[$retrytype] = $objQueue->PopEvent($retrytype);
     74             }
     75             
     76             if( $aValues[$jobtype] || $aValues[$delaytype] || $aValues[$retrytype] ){
     77 
     78                 foreach ($aValues as $key => $aJobs) {
     79 
     80                     if( empty($aJobs) || !is_array($aJobs) )
     81                         continue;
     82 
     83                     foreach ($aJobs as $value) {
     84                         $aCountLog['count']++;
     85                         list($callback,$data) = json_decode($value,true);
     86 
     87                         if( empty($callback) ){
     88                             $aCountLog['callbacknull']++;
     89                             continue;
     90                         }
     91                         
     92                         if( !method_exists(A('Crontab/Jobwork'),$callback) ){
     93                             $objQueue->push($callback,$data,$dietype);
     94                             $aCountLog[$callback]['dielist']++;
     95                             $aCountLog[$callback]['jobs_error']++;
     96                             continue;
     97                         }
     98                         $starttime = microtime(true);
     99                         $ret = call_user_func_array(array(A('Crontab/Jobwork'),$callback),array(json_encode($data)));
    100                         $endtime = microtime(true);
    101 
    102                         $runtime = round($endtime-$starttime,4);
    103                         $aCountLog[$callback]['count']++;
    104                         $aCountLog[$callback]['runtime'] += $endtime-$starttime;
    105                         $aCountLog[$callback]['maxtime'] = $aCountLog[$callback]['maxtime']<$runtime ? $runtime : $aCountLog[$callback]['maxtime'];
    106                         if( !$ret){
    107                             $aCountLog[$callback]['fail']++;
    108                             //写入重试队列
    109                             in_array($key,array($jobtype,$delaytype)) && $objQueue->push($callback,$data,$retrytype);
    110                             //写入死信队列
    111                             if($key == $retrytype) {
    112                                 $objQueue->push($callback,$data,$dietype);
    113                                 $aCountLog[$callback]['dielist']++;
    114                             }
    115                         } else {
    116                             $aCountLog[$callback]['succ']++;
    117                         }
    118                     }
    119                 }
    120             } else {
    121                 sleep(1);
    122             }
    123             
    124             //检测脚本是否需要重启
    125             if( Lock::factory()->getLock($filename,$locktypedie,$runid)){
    126                 if( Lock::factory()->deleteLock($filename,$locktypedie,$runid) && Lock::factory()->deleteLock($filename,$locktyperun,$runid) ){
    127                     die();
    128                 }
    129             }
    130     
    131             
    132             usleep(5);
    133         }    
    134         
    135     }
    136     
    137 }

      四,job回调模块

        

     1 <?php
     2 use ThinkController;
     3 /**
     4  * 此文件为消息队列业务处理层
     5 **/
     6 
     7 Class JobworkController extends Controller {
     8     
     9     //测试方法
    10     public function testCallback($param){
    11         debugLog($param,"jobwork");
    12         
    13         return true;
    14     }
    15     
    16     //测试方法
    17     public function test1Callback($param){
    18         debugLog($param,"jobwork");
    19         
    20         return true;
    21     }
    22 }

      五,队列进程,根据不同队列启用相对应的进程即可

      cli.php   test/Job/run 1_1

      cli.php   test/Job/run 1_2

      cli.php   test/Job/run 1_3

        

      

     

      

    PHP中常见的问题点,知识点,及盲点。
  • 相关阅读:
    tyvj4751 NOIP春季系列课程 H's Problem (树状数组)
    卡牌分组([AtCoder ARC073]Ball Coloring)
    bzoj1036 [ZJOI2008]树的统计Count (树链剖分+线段树)
    bzoj2287 POJ Challenge 消失之物(背包)
    不能建立引用数组
    CString和string的区别
    防止应用程序重复启动
    public,protected,private
    ATL
    c++头文件中定义全局变量
  • 原文地址:https://www.cnblogs.com/sblack/p/12909009.html
Copyright © 2011-2022 走看看