1,项目一级目录新建一个server文件

#!/usr/bin/env php <?php try { require __DIR__ . "/start.php"; } catch (Exception $e) {} $queueNames = config('queue.qnames'); if (empty($queueNames)) exit('队列名称未配置'); $option = $argv[1]; $queueName = isset($argv[2]) ? $argv[2] : null; if ($queueName && !in_array($queueName, $queueNames)) exit('队列名称不存在'); if ($queueName) { unset($queueNames); $queueNames[] = $queueName; } echo "开始操作... "; switch ($option) { case 'start': startQueue($queueNames); break; case 'stop': stopQueue($queueNames); break; case 'reload': stopQueue($queueNames); startQueue($queueNames); break; case 'monitor': demon(); break; default: echo '操作类型: start|stop|reload|monitor' . " "; break; } echo "结束操作... "; function startQueue(array $queueNames) { $correct = 'dispatch/Correct/index$'; $ret = isProcessExist($correct); if($ret == 0){ $cmd = "nohup php start.php dispatch/Correct/index >> /tmp/correct.log &"; system($cmd,$result); ($result == 0) or die("$cmd 启动失败 "); echo "$correct 队列已启动 "; }else { echo "$correct 进程已存在,无需重启 "; } foreach ($queueNames as $_queueName) { $proccessname = "dispatch/Consume/index/qname/$_queueName$"; if(isProcessExist($proccessname) == 1) { echo $_queueName . "进程已经存在,无需重启 "; }else { $command = "nohup php start.php dispatch/Consume/index/qname/" . $_queueName." >> /tmp/$_queueName.log &"; system($command,$result); ($result == 0) or die("$command 启动失败 "); echo $_queueName . "队列已启动 "; } } } function demon() { $queues = config('queue.qnames'); foreach($queues as $queue) { if(!isProcessExist($queue)) { try{ startQueue([$queue]); }catch(Exception $e){ echo "队列监控启动失败: ".$e->getMessage()." "; } } } } function stopQueue(array $queueNames) { $redisconf = config('redis'); $redis = libraryRedis::getInstance($redisconf); foreach ($queueNames as $key => $_queueName) { $redis->hset('script:signal', $_queueName, false); libraryQueue::getInstance()->addEvent('Stop', [])->push($_queueName); $proccessname = "dispatch/Consume/index/qname/$_queueName$"; while(true) { usleep(200000); // 0.2s $result = isProcessExist($proccessname); if(!$result) { break; } unset($result); } echo $_queueName . "队列已停止 "; } } function isProcessExist($processname) { $ps = 'ps axu|grep "'.$processname.'"|grep -v "grep"|wc -l'; $ret = shell_exec("$ps"); $ret = rtrim($ret, " "); return $ret; }
2,项目一级目录新建一个start.php
3,修改application/extra/queue.php

<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2016 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- return [ // 'connector' => 'Sync' 'qnames'=>[ 'Trade', //投资,变现,返本相关 'Activity', //活动相关 'Sms', //发短信相关 'Repay', //借款人还款 'Interest', //计息,派息相关 'DebtOnline', //债权上线相关 'Other' //其他 ], 'driver' => env('QUEUE_DRIVER', 'redis'), ];