一,适用于场景,异步,回调,非实时性业务场景,支持队列数据失败重传,多进程,实时延时队列,重试队列,死信队列,并针对队列数据进行监控。
二,相关概念
QUEUE模块:消息队列push pop模块。
PHP进程: 针对队列数据起的PHP多个进程
文件锁:用来控制进程的启动与重启
REDIS队列:数据载体,redis缓存list
三,环境
tp3(框架比较旧),tp5貌似已经有toptink-queue的模块可以直接用
三,实现
QUEUE.php
1 <?php 2 namespace Think; 3 class Queue { 4 //正常工作队列 5 CONST JOBTYPE = 1; 6 //延迟队列 7 CONST DELAYTYPE = 2; 8 //失败重试队列 9 CONST RETRYTYPE = 3; 10 //死信队列 11 CONST DIETYPE = 4; 12 //工作队列数 13 public static $_listnums = 3; 14 //每次队列取元素个数 15 private static $_listpushnums = 10; //每个队列出队数 16 //工作队列key+队列ID 17 private static $_listkey = 'test:queue:'; 18 //延迟队列 19 private static $_delaylistkey = 'test:delay:queue:'; 20 //重试队列key 21 private static $_retrylistkey = 'test:retry:queue:retry'; 22 //死信队列key 23 private static $_dielistkey = 'test:die:queue:'; 24 25 //延时对列 26 private static $DelayedEvent = array(''); 27 28 //redis对象 29 private $redis = null; 30 31 //工厂实例 32 protected static $_instance = array(); 33 34 //支持不同redis实例调用 35 public static function factory($objredis){ 36 if(!is_object($objredis)){ 37 return false; 38 } 39 $md5 = md5(serialize($objredis)); 40 if(!is_object(self::$_instance['queue'][$md5])){ 41 self::$_instance['queue'][$md5] = new self($objredis); 42 } 43 return self::$_instance['queue'][$md5]; 44 } 45 46 47 //构造函数 48 public function __construct($objredis){ 49 $this->redis = $objredis; 50 $this->env = ENV; 51 } 52 53 /** 54 * 随机工作队列 55 * @return [type] [description] 56 */ 57 public function getListKey($process=0){ 58 $process = $process ? $process : mt_rand(1,self::$_listnums); 59 return self::$_listkey.$this->env.$process; 60 } 61 62 /** 63 * 延迟队列 64 * @param [type] $env [description] 65 * @return [type] [description] 66 */ 67 public function getDelayListKey(){ 68 return self::$_delaylistkey.$this->env; 69 } 70 71 /** 72 * 获取重试队列 73 * @return [type] [description] 74 */ 75 public function getRetrylistkey(){ 76 return self::$_retrylistkey.$this->env; 77 } 78 79 /** 80 * 获取死信队列 81 * @return [type] [description] 82 */ 83 public function getDielistkey(){ 84 return self::$_dielistkey.$this->env; 85 } 86 87 /** 88 * 入队 89 * @param [type] $callback [定义消息回调函数] 90 * @param [type] $aMessage [定义消息体] 91 * @param [type] $aMessage [写入事件类型] 92 * @param [type] $aMessage [是否备份文件] 93 * @return [type] [true/false] 94 */ 95 public function push($callback,$aMessage,$type=1,$wfile=true){ 96 if( !is_array($aMessage) || empty($aMessage) || !in_array($type,array(self::JOBTYPE,self::RETRYTYPE,self::DIETYPE))) return false; 97 98 //延迟队列 99 $type = ($type == self::JOBTYPE && in_array($callback,self::$DelayedEvent)) ? self::DELAYTYPE : $type; 100 101 switch ($type) { 102 case self::JOBTYPE: 103 $listkey = $this->getListKey(); 104 break; 105 case self::DELAYTYPE: 106 $listkey = $this->getDelayListKey(); 107 break; 108 case self::RETRYTYPE: 109 $listkey = $this->getRetrylistkey(); 110 break; 111 case self::DIETYPE: 112 $listkey = $this->getDielistkey(); 113 break; 114 default: 115 return false; 116 break; 117 } 118 119 //备份消息体 120 $wfile && error_log($type."|".date('Y-m-d H:i:s')."|".$callback."|".json_encode($aMessage)." ",3,LOG_PATH."Crontab/job/queue_".$this->env.".log"); 121 122 //入队 123 $this->rPush($listkey, json_encode( array($callback,$aMessage) ) ); 124 return true; 125 } 126 127 /** 128 * 按队列类型取数据 129 * @param [type] $type [获取队列类型] 130 */ 131 public function PopEvent($type){ 132 $result = array(); 133 $type = intval($type); 134 switch ($type) { 135 case self::JOBTYPE://工作队列 136 for($listid=1;$listid <= self::$_listnums;$listid++){ 137 $temp = $this->lGetRange($this->getListKey($listid),0,self::$_listpushnums-1); 138 if( empty($temp) || !is_array($temp)) 139 continue; 140 $result = $result ? array_merge($result,$temp): $temp; 141 } 142 break; 143 case self::DELAYTYPE: //delay queue 144 $result = $this->lGetRange($this->getDelayListKey(),0,self::$_listpushnums-1); 145 break; 146 case self::RETRYTYPE: //retry queue 147 $result = $this->lGetRange($this->getRetrylistkey(),0,self::$_listpushnums-1); 148 break; 149 case self::DIETYPE: //die queue 150 $result = $this->lGetRange($this->getDielistkey(),0,self::$_listpushnums-1); 151 break; 152 } 153 return $result; 154 } 155 156 /** 157 * 向队列尾部添加一条消息 158 * @param [type] $listkey [description] 159 * @param [type] $data [description] 160 * @return [type] [description] 161 */ 162 public function rPush($listkey,$data){ 163 $this->redis->rPush($listkey, $data); 164 } 165 166 /** 167 * 取出指定队列头部一条消息 168 * @param [type] $listid [description] 169 * @return [type] [description] 170 */ 171 public function lPop($listkey){ 172 return $this->redis->lPop($listkey); 173 } 174 175 /** 176 * 弹出队列中一段 177 * @param [type] $listkey [description] 178 * @param [type] $start [description] 179 * @param [type] $end [description] 180 * @return [type] [description] 181 */ 182 public function lGetRange($listkey,$start,$end){ 183 $return = $this->redis->lGetRange($listkey,$start,$end); 184 $return && $this->redis->lTrim($listkey,$end+1,-1); 185 return $return; 186 } 187 188 /** 189 * 删除指定队列 190 * @param [type] $listkey [description] 191 * @return [type] [description] 192 */ 193 public function deleteList($listkey){ 194 return $this->redis->delete($listkey); 195 } 196 197 /** 198 * 获得队列状态,即目前队列中的消息数量 199 * @return mixed 200 */ 201 public function size($listkey){ 202 return $this->redis->lSize($listkey); 203 } 204 205 /** 206 * 获取链接信息 207 * @return [type] [description] 208 */ 209 public function ping(){ 210 return $this->redis->ping(); 211 } 212 213 /** 214 * 关闭链接 215 */ 216 public function close(){ 217 $this->redis->close(); 218 } 219 /** 220 * 统计当前消息队列待处理消息数量 221 * @return [type] [description] 222 */ 223 public function getCountSize(){ 224 $aRet = array(); 225 for($listid = 1;$listid <= self::$_listnums;$listid++){ 226 $aRet[self::JOBTYPE][$listid] = (int)$this->size($this->getListKey($listid)); 227 } 228 $aRet[self::DELAYTYPE][1] = (int)$this->size($this->getDelayListKey()); 229 $aRet[self::RETRYTYPE][1] = (int)$this->size($this->getRetrylistkey()); 230 $aRet[self::DIETYPE][1] = (int)$this->size($this->getDielistkey()); 231 return $aRet; 232 } 233 234 /** 235 * 清空消息队列 236 * @return [type] [description] 237 */ 238 public function clearList($type){ 239 switch ($type) { 240 case self::JOBTYPE: 241 for($listid = 1;$listid <= self::$_listnums;$listid++){ 242 $this->deleteList($this->getListKey($listid)); 243 } 244 break; 245 case self::DELAYTYPE: 246 $this->deleteList($this->getDelayListKey()); 247 break; 248 case self::RETRYTYPE: 249 $this->deleteList($this->getRetrylistkey()); 250 break; 251 case self::DIETYPE: 252 $this->deleteList($this->getDielistkey()); 253 break; 254 } 255 return true; 256 } 257 258 /** 259 * 获得统计key 260 * @param [type] $modename [description] 261 */ 262 public function getCountKey($modename){ 263 return $this->env . ":queue:count:{$modename}"; 264 } 265 266 /** 267 * 记录一些任务统计日志 268 * @param [type] $modename [description] 269 * @param [type] $runid [description] 270 * @param [type] $strLogs [description] 271 */ 272 public function addLog($modename,$runid,$strLogs){ 273 $hashkey = $this->getCountKey($modename); 274 return $this->redis->hSet($hashkey,$runid,$strLogs); 275 } 276 277 /** 278 * 统计任务 279 * @param [type] $modename [description] 280 * @return [type] [description] 281 */ 282 public function info($modename){ 283 $hashkey = $this->getCountKey($modename,$this->env); 284 return $this->redis->hGetAll($hashkey); 285 } 286 287 288 public function __destruct(){ 289 $this->close(); 290 } 291 292 }
二,文件lock模块
其实就是利于读写文件及touch操作
1 <?php 2 namespace Think; 3 /** 4 * 文件锁管理 5 */ 6 class Lock{ 7 8 CONST TYPERUN = 'run'; 9 CONST TYPEDIE = 'die'; 10 CONST TYPELOG = 'log'; 11 static private $pathlock = ''; 12 protected static $_instance = array(); 13 14 public static function factory($path = LOG_PATH){ 15 $obmd = md5($path); 16 if(!is_object(self::$_instance['Lock'][$obmd])){ 17 self::$_instance['Lock'][$obmd] = new self($path); 18 } 19 return self::$_instance['Lock'][$obmd]; 20 } 21 22 public function __construct($path){ 23 $this->pathlock = $path."runlock/"; 24 !is_dir($this->pathlock) && mkdir($this->pathlock,0777); 25 } 26 27 /** 28 * 获得锁文件 29 * @param [type] $name [锁昵称] 30 * @param [type] $type [锁类型] 1运行锁 2重启锁 31 * @param [type] $pid [锁id] 32 * @return [type] [description] 33 */ 34 public function getLockName($name,$type,$runid=1){ 35 return "{$name}_{$runid}.{$type}"; 36 } 37 38 /** 39 * [getLockPath description] 40 * @param [type] $name [description] 41 * @param [type] $type [description] 42 * @return [type] [description] 43 */ 44 public function getLockPath($name,$type,$runid=1){ 45 $filename = $this->getLockName($name,$type,$runid); 46 return $this->pathlock.$filename; 47 } 48 49 /** 50 * 获取锁文件最后更新时间和锁对应的进程pid 51 * @param [type] $name [description] 52 * @param [type] $type [description] 53 * @param integer $runid [description] 54 * @return [type] [description] 55 */ 56 public function getLock($name,$type,$runid=1){ 57 $aresult = array(); 58 $filename = $this->getLockPath($name,$type,$runid); 59 if( !is_file($filename) ){ 60 return $aresult; 61 } 62 $aresult['time'] = filemtime($filename); 63 $aresult['pid'] = file_get_contents($filename); 64 return $aresult; 65 } 66 67 /** 68 * 更新锁文件时间 69 * @param [type] $name [description] 70 * @param [type] $type [description] 71 * @param integer $runid [description] 72 * @return [type] [description] 73 */ 74 public function touchLock($name,$type,$runid=1){ 75 return touch( $this->getLockPath($name,$type,$runid) ); 76 } 77 78 /** 79 * 清除指定锁文件 80 * @param [type] $name [description] 81 * @param [type] $type [description] 82 * @param integer $runid [description] 83 * @return [type] [description] 84 */ 85 public function deleteLock($name,$type,$runid=1){ 86 return unlink($this->getLockPath($name,$type,$runid)); 87 } 88 89 /** 90 * 添加锁文件 91 * @param [type] $name [description] 92 * @param [type] $type [description] 93 * @param integer $runid [description] 94 * @param [type] $strvalue [description] 95 */ 96 public function addLock($name,$type,$runid=1,$strvalue=0){ 97 $filename = $this->getLockPath($name,$type,$runid); 98 return file_put_contents($filename,$strvalue); 99 } 100 101 }
1 <?php 2 namespace Think; 3 /** 4 * 文件锁管理 5 * 6 * author wangyanjun 20200513 7 * 8 */ 9 class Lock{ 10 11 CONST TYPERUN = 'run'; 12 CONST TYPEDIE = 'die'; 13 CONST TYPELOG = 'log'; 14 static private $pathlock = ''; 15 protected static $_instance = array(); 16 17 public static function factory($path = LOG_PATH){ 18 $obmd = md5($path); 19 if(!is_object(self::$_instance['Lock'][$obmd])){ 20 self::$_instance['Lock'][$obmd] = new self($path); 21 } 22 return self::$_instance['Lock'][$obmd]; 23 } 24 25 public function __construct($path){ 26 $this->pathlock = $path."runlock/"; 27 !is_dir($this->pathlock) && mkdir($this->pathlock,0777); 28 } 29 30 /** 31 * 获得锁文件 32 * @param [type] $name [锁昵称] 33 * @param [type] $type [锁类型] 1运行锁 2重启锁 34 * @param [type] $pid [锁id] 35 * @return [type] [description] 36 */ 37 public function getLockName($name,$type,$runid=1){ 38 return "{$name}_{$runid}.{$type}"; 39 } 40 41 /** 42 * [getLockPath description] 43 * @param [type] $name [description] 44 * @param [type] $type [description] 45 * @return [type] [description] 46 */ 47 public function getLockPath($name,$type,$runid=1){ 48 $filename = $this->getLockName($name,$type,$runid); 49 return $this->pathlock.$filename; 50 } 51 52 /** 53 * 获取锁文件最后更新时间和锁对应的进程pid 54 * @param [type] $name [description] 55 * @param [type] $type [description] 56 * @param integer $runid [description] 57 * @return [type] [description] 58 */ 59 public function getLock($name,$type,$runid=1){ 60 $aresult = array(); 61 $filename = $this->getLockPath($name,$type,$runid); 62 if( !is_file($filename) ){ 63 return $aresult; 64 } 65 $aresult['time'] = filemtime($filename); 66 $aresult['pid'] = file_get_contents($filename); 67 return $aresult; 68 } 69 70 /** 71 * 更新锁文件时间 72 * @param [type] $name [description] 73 * @param [type] $type [description] 74 * @param integer $runid [description] 75 * @return [type] [description] 76 */ 77 public function touchLock($name,$type,$runid=1){ 78 return touch( $this->getLockPath($name,$type,$runid) ); 79 } 80 81 /** 82 * 清除指定锁文件 83 * @param [type] $name [description] 84 * @param [type] $type [description] 85 * @param integer $runid [description] 86 * @return [type] [description] 87 */ 88 public function deleteLock($name,$type,$runid=1){ 89 return unlink($this->getLockPath($name,$type,$runid)); 90 } 91 92 /** 93 * 添加锁文件 94 * @param [type] $name [description] 95 * @param [type] $type [description] 96 * @param integer $runid [description] 97 * @param [type] $strvalue [description] 98 */ 99 public function addLock($name,$type,$runid=1,$strvalue=0){ 100 $filename = $this->getLockPath($name,$type,$runid); 101 return file_put_contents($filename,$strvalue); 102 } 103 104 三,job模块,队列进程模块
JobController.class.php
1 <?php 11 class JobController { 12 13 //crontab start 14 public function run(){ 15 $runid = $_SERVER['argv']['2'] ? $_SERVER['argv']['2'] : 1; 16 list($grunid,$listtype) = explode('_', $runid); 17 //进程组 18 $grunid = $grunid ? intval($grunid) : 1; 19 //$listtype 队列类型 不指定队列类型则一个进程处理所有类型的队列 20 $listtype = $listtype ? $listtype : 0; 21 22 //jobController 23 $filename = basename(__FILE__,".class.php"); 24 $aCountLog = array(); 25 26 $locktyperun = Lock::TYPERUN; 27 $locktypedie = Lock::TYPEDIE; 28 $locktypelog = Lock::TYPELOG; 29 30 $jobtype = Queue::JOBTYPE; 31 $delaytype = Queue::DELAYTYPE; 32 $retrytype = Queue::RETRYTYPE; 33 $dietype = Queue::DIETYPE; 34 35 if( !in_array($listtype, array(0,$jobtype,$delaytype,$retrytype) ) ){ 36 die('type fail!'); 37 } 38 39 //判断是否已运行 40 clearstatcache(); //清除PHP文件信息缓存 41 $lockValue = Lock::factory()->getLock($filename,$locktyperun,$runid); 42 if( $lockValue ) { 43 if((time() - $lockValue['time']) < 300) { 44 die(); 45 } else { 46 $pid = $lockValue['pid']; 47 shell_exec("ps aux | grep 'cli.php' | grep 'Crontab/Job/run' | grep -v 'grep' | awk '{print $2}' | grep {$pid} | xargs --no-run-if-empty kill"); 48 } 49 } 50 51 //设置运行状态 52 if(!Lock::factory()->addLock($filename,$locktyperun,$runid,getmypid()) ) { 53 die(); 54 } 55 56 //声明queue对象 57 $objredis = Apps::redis('cms'); 58 $objQueue = Queue::factory($objredis); 59 60 while ( true ) { 61 //更新锁文件时间 62 Lock::factory()->touchLock($filename,$locktyperun,$runid); 63 64 $aValues = array(); 65 $objQueue->ping(); 66 67 68 if( $listtype > 0){ //分进程处理不同类型队列 69 $aValues[$listtype] = $objQueue->PopEvent($listtype); 70 } else { //全部在一个进程处理 71 $aValues[$jobtype] = $objQueue->PopEvent($jobtype); 72 $aValues[$delaytype] = $objQueue->PopEvent($delaytype); 73 $aValues[$retrytype] = $objQueue->PopEvent($retrytype); 74 } 75 76 if( $aValues[$jobtype] || $aValues[$delaytype] || $aValues[$retrytype] ){ 77 78 foreach ($aValues as $key => $aJobs) { 79 80 if( empty($aJobs) || !is_array($aJobs) ) 81 continue; 82 83 foreach ($aJobs as $value) { 84 $aCountLog['count']++; 85 list($callback,$data) = json_decode($value,true); 86 87 if( empty($callback) ){ 88 $aCountLog['callbacknull']++; 89 continue; 90 } 91 92 if( !method_exists(A('Crontab/Jobwork'),$callback) ){ 93 $objQueue->push($callback,$data,$dietype); 94 $aCountLog[$callback]['dielist']++; 95 $aCountLog[$callback]['jobs_error']++; 96 continue; 97 } 98 $starttime = microtime(true); 99 $ret = call_user_func_array(array(A('Crontab/Jobwork'),$callback),array(json_encode($data))); 100 $endtime = microtime(true); 101 102 $runtime = round($endtime-$starttime,4); 103 $aCountLog[$callback]['count']++; 104 $aCountLog[$callback]['runtime'] += $endtime-$starttime; 105 $aCountLog[$callback]['maxtime'] = $aCountLog[$callback]['maxtime']<$runtime ? $runtime : $aCountLog[$callback]['maxtime']; 106 if( !$ret){ 107 $aCountLog[$callback]['fail']++; 108 //写入重试队列 109 in_array($key,array($jobtype,$delaytype)) && $objQueue->push($callback,$data,$retrytype); 110 //写入死信队列 111 if($key == $retrytype) { 112 $objQueue->push($callback,$data,$dietype); 113 $aCountLog[$callback]['dielist']++; 114 } 115 } else { 116 $aCountLog[$callback]['succ']++; 117 } 118 } 119 } 120 } else { 121 sleep(1); 122 } 123 124 //检测脚本是否需要重启 125 if( Lock::factory()->getLock($filename,$locktypedie,$runid)){ 126 if( Lock::factory()->deleteLock($filename,$locktypedie,$runid) && Lock::factory()->deleteLock($filename,$locktyperun,$runid) ){ 127 die(); 128 } 129 } 130 131 132 usleep(5); 133 } 134 135 } 136 137 }
四,job回调模块
1 <?php 2 use ThinkController; 3 /** 4 * 此文件为消息队列业务处理层 5 **/ 6 7 Class JobworkController extends Controller { 8 9 //测试方法 10 public function testCallback($param){ 11 debugLog($param,"jobwork"); 12 13 return true; 14 } 15 16 //测试方法 17 public function test1Callback($param){ 18 debugLog($param,"jobwork"); 19 20 return true; 21 } 22 }
五,队列进程,根据不同队列启用相对应的进程即可
cli.php test/Job/run 1_1
cli.php test/Job/run 1_2
cli.php test/Job/run 1_3