zoukankan      html  css  js  c++  java
  • PHP异步任务worker

    1.概述

    异步任务框架主要包含两个角色:

    • WorkerServer
      主要负责管理(启动,重启,监控等)worker工作进程。
    • Worker
      负责从指定消息队列获取任务消息并执行任务。

     

    为了提高worker任务处理效率,目前按任务处理时间长短,区分不同的任务队列,目前可用的任务名(不同任务名代表不同的队列)如下:

    • defaultJob
      默认任务队列,主要处理一些小任务
    • largeJob
      主要处理一些比较耗时的任务

    2.开发流程

    2.1编写异步任务处理接口

    编写异步任务处理接口的开发方式,跟普通的接口开发没有明显区别。

    例子:

    下面是对顾客进行群发短信的接口定义,实现部分这里忽略掉。

    <?php

    //短信群发模块

    Interface IBroadcastSMS {

    /**

    * @param int $adminId - 商户id

    * @param int $content - 短信内容

    * @param array $params - 群发客户筛选条件, 具体参数说明本例忽略...

    * @return bool 成功返回true, 失败返回false

    */

    public function send($adminId, $content, array $params = []);

    }

    短信群发模块的类型定义为:

    SrvType::CRM_MARKETING_BROADCAST_SMS

    2.2配置worker

    主要涉及两个配置文件的配置,分别是:

    Ps: 配置文件均在config目录

     

    • constants/workerTypes.php
      worker类型定义文件
    • worker.php
      worker任务相关配置

    worker类型定义:

    <?php

    /**

     * worker type 定义

     */

    class WorkerTypes

    {

        /**

         * 群发短信

     * 常量定义格式:

    * [类型名] = '类型值必须唯一,且按名字空间按点分割,最后接上接口名';

         */

    const BROADCAST_SMS = 'crm.marketing.broadcastsms.send';

    }

     

    Worker任务配置:

    worker.php

    蓝色部分代码为任务配置,每一个异步任务都需要添加一个类似的配置。

     

    <?php

    /**

     * worker配置

     */

    return [

        "pid" => PROJECT_PATH .'/runtime/workerServer.pid',

        //php命令路径

        "php" => "",

        //进程运行角色

        "user"   => 'www',

        //当队列为空时候,获取消息时等待多少秒,范围限制:0-30秒

        "pollingWaitSeconds" => 30,

        "workers" => [

         WorkerTypes::BROADCAST_SMS  => [

             //任务名, 任务名相同则共用同一个消息队列,关于任务名,文档另行说明

         "jobName" => "defaultJob",

                //worker获取消息后,多长时间内其他worker不能消费同一条消息,单位秒,最长12小时内

                "visibilityTimeout" => 300,

                //true则预先消费消息,worker获取消息后立即删除消息, false则任务执行返回为真才会删除消息

                "preConsume" => false,

                //当前任务并发执行的worker数量

         "threadNum" => 10,

                //每个worker生存时间, 超时则重启

         "lifeTime" => 3600,

                //每个worker最大任务处理数,超过则重启

         "maxHandleNum" =>  10000,

                //任务处理器, 格式[service类型, '业务接口']

         "handler"    => [SrvType::CRM_MARKETING_BROADCAST_SMS, 'send'],

                //任务描述信息

                "desc"  => '群发短信',

         ],

        ]

    ];

     

    2.3发送异步任务消息

    发送异步任务消息需要MessageServer 类,这个类负责所有消息队列操作。

     

    对于发送worker消息,只需要MessageServer的一个接口

     

    /**

         * 把消息发送给指定的worker执行

         * @param string $workerType - worker type worker类型

         * @param array $params - 任务参数, 任务参数数组内容,按顺序对应service接口参数

         * @param int $delaySeconds - 延迟时间,单位秒 0-604800秒(7天)范围内某个整数值

         * @return bool 成功返回true, 失败返回false

         * @throws CDbException

         */

    public function dispatch($workerType, array $params = [], $delaySeconds = null);

     

     

    接上面的例子:

    //派发worker异步任务消息

    MessageServer::getInstance()->dispatch(

                WorkerTypes::BROADCAST_SMS,

                    [

                        1,    //对应adminId

                        "群发短信内容",  //对应content短信内容参数

                        [       ///对应params参数群发客户筛选条件

                            "is_member" => true

                        ]

                    ]);

     

    下面是worker会调用的service类型为SrvType::CRM_MARKETING_BROADCAST_SMS的send接口,留意service接口参数和dispatch派发任务参数的关系。

    /**

    * @param int $adminId - 商户id

    * @param int $content - 短信内容

    * @param array $params - 群发客户筛选条件, 具体参数说明本例忽略...

    * @return bool 成功返回true, 失败返回false

    */

    public function send($adminId, $content, array $params = [])

     

     

    Ps: dispatch任务参数数组按顺序对应service接口的参数,他们是一一对应关系.

    3.部署

    Ps: 为便于调试,开发阶段不需要部署worker server,  派发worker消息,会直接调用对应的service接口,没有经过消息队列。

     

    3.1环境安装

    Worker server依赖php扩展库:

    • Swoole v1.9.17
      PHP的异步、并行、高性能网络通信引擎.
    • Pcntl
      php源码内置的的进程管理库,因为需要处理阻塞式信号处理,不能使用swoole的进程管理。

     

    安装Swoole:

     

    #git clone https://git.oschina.net/swoole/swoole.git

    //切换到指定版本

    #git checkout v1.9.17

     

    #phpize

    #./configure

    #make && make install

     

    修改php.ini文件

    添加extension=swoole.so

     

    安装pcntl

    因为pcntl包含在php源码目录,因此需要php源码

     

    #cd php源码目录/ext/pcntl

    #phpize

    #./configure

    #make && make install

    修改php.ini文件

    添加extension=pcntl.so

     

    3.2启动/关闭

    切换至脚本目录

    #cd /www/scripts

    设置执行权限

    #chmod u+x workerServer.sh

     

    启动worker server

    #./workerServer.sh start

     

    关闭

    #./workerServer.sh stop

     

     declare:

    A tick is an event that occurs for every N low-level tickable statements executed by the parser within the declare block. The value for N is specified using ticks=N within the declare block's directive section.

    这是PHP中对 declare 中的 ticks 的定义

    中文翻译 Tick(时钟周期)是一个在 declare 代码段中解释器每执行 N 条可计时的低级语句就会发生的事件。N 的值是在 declare 中的 directive 部分用ticks=N 来指定的. 

    我个人理解的大意就是说, tick 这个周期上会绑定一个事件, 这个事件, 当Zend引擎执行到所设置的 tick (就是N) 行低级语句时, 就执行  register_tick_function()  定义的事件 

    关于什么是低级语句, http://my.oschina.net/Jacker/blog/32936 这篇文章说的很详细, 有时间的话, 可以研究一下的, 涉及到了PHP的底层 zend 引擎.

     

     

     

    <?php
    namespace aseworker;
    use aseConfig;
    use aseMessageServer;
    use aseWorkerApplication;
    use SwooleProcess;
    use SwooleTimer;
    /**
     * Worker server, 主要用于管理和维护worker进程
     */
    class WorkerServer
    {
        /**
         * 当前实例
         * @var WorkerServer
         */
        private static $_instance = null;
    
        /**
         * 整个worker服务的配置
         */
        private $_conf;
    
        /**
         * @var WorkerApplication
         */
        private $_app;
    
    
        /**
         * 正在运行的workers
         * 格式:
         *    'Worker type' => [pid1 => true, pid2 => true, pid3 => true]
         * @var array
         */
        private $_runningWorkers = [];
    
        /**
         * pid to worker type
         * 格式:
         *  pid => worker type
         * @var array
         */
        private $_pidMapToWorkerType = [];
    
        /**
         * 监控worker的Timer ID
         */
        private $_monitorTimerId;
    
        /**
         * 用于控制dev,test环境,每个队列只启动1个进程
         * @var array
         */
        private $_queueWorkers = [];
    
        private function __construct()
        {
            $this->_log("start worker server...");
            $this->_conf = Config::getInstance()->get('', 'worker');
            $this->_app = YII::app();
    
            //masker进程注册相关信号处理
            Process::signal(SIGCHLD, [$this, 'doSignal']);
            Process::signal(SIGTERM, [$this, 'doSignal']);
    
            //根据 -d 参数确认是否后台运行
            $options = getopt('d');
            if (isset($options['d'])) {
                Process::daemon();
                file_put_contents($this->_conf['pid'], posix_getpid());
            }
    
            //初始化worker队列
            $this->_initWorkerMessageQueue();
        }
    
        /**
         * 获取定时任务服务
         * @return WorkerServer
         */
        public static function getInstance()
        {
            if (self::$_instance == null) {
                self::$_instance = new WorkerServer();
            }
            return self::$_instance;
        }
    
        /**
         * 启动worker server
         */
        public function run()
        {
            $this->startWorker();
    
            //监控worker进程
            Timer::after(5*60*1000, function () {
                $this->_monitorTimerId = Timer::tick(1000, function () {
                    $this->startWorker();
                });
            });
        }
    
        /**
         * 启动worker, 允许重复执行
         */
        public function startWorker()
        {
            $workersConf = $this->_conf['workers'];
            if (empty($workersConf)) {
                return;
            }
    
            foreach ($workersConf as $workerType => $conf) {
                if (!isset($conf['jobName']) || !isset($conf['threadNum']) || !isset($conf['lifeTime']) || !isset($conf['maxHandleNum']) || !isset($conf['handler'])) {
                    $this->_log("worker config error. workerType={$workerType}");
                    continue;
                }
    
    
                //控制测试环境的进程数
                if (in_array(LWM_ENV, ['dev', 'test'])) {
                    $jobName = $conf['jobName'];
                    $jWorkers = 0;
                    if (isset($this->_queueWorkers[$jobName])) {
                        $jWorkers = $this->_queueWorkers[$jobName];
                    }
                    else {
                        $this->_queueWorkers[$jobName] = 1;
                    }
                    if ($jWorkers > 0) {
                        continue;
                    }
                    //默认启动一个进程用于测试
                    $conf['threadNum'] = 1;
                }
    
                $workers = $this->_getWorkers($workerType);
                if ($workers >= $conf['threadNum']) {
                    continue;
                }
    
                $hasWorkers = $conf['threadNum'] - $workers;
                //启动worker
                for ($i=0; $i < $hasWorkers; $i++) {
                    $workerProcess = new Process(function (Process $worker) use ($workerType) {
                        $this->_log("start worker, workerType={$workerType}, pid={$worker->pid}");
                        $cmd = $this->_conf['php'];
                        $worker->exec($cmd,  ['worker.php', '-t', $workerType]);
                    },false, false);
    
                    $pid = $workerProcess->start();
                    if ($pid === false) {
                        $this->_log("start worker failure. workerType={$workerType}");
                        continue;
                    }
                    //注册worker
                    $this->_addWorker($workerType, $pid);
                }
            }
        }
    
        /**
         * 处理进程信号
         * @param int $sig  - 信号类型
         */
        public function doSignal($sig) {
            switch ($sig) {
                case SIGCHLD:
                    //回收子进程资源
                    //必须为false,非阻塞模式
                    while($ret =  Process::wait(false)) {
                        $pid = $ret['pid'];
                        $this->_delWorkerByPid($pid);
                        $this->_log("回收进程资源, pid={$ret['pid']}");
                    }
    
                    if ($this->_getTotalWorkers() == 0) {
                        //当子进程都退出后,结束masker进程
                        @unlink($this->_conf['pid']);
                        exit(0);
                    }
                    break;
                case SIGTERM:
                    //进程退出处理
                    //关闭监控
                    Timer::clear($this->_monitorTimerId);
                    if (!empty($this->_pidMapToWorkerType)) {
                        $this->_log("worker server shutdown...");
                        foreach (array_keys($this->_pidMapToWorkerType) as $pid) {
                            Process::kill($pid, SIGTERM);
                        }
                    }
                    break;
            }
        }
    
        /**
         * 初始化消息队列
         */
        private function _initWorkerMessageQueue()
        {
            if (empty($this->_conf)) {
                return;
            }
    
            $messageServer = MessageServer::getInstance();
            foreach ($this->_conf['workers'] as $wokerType => $workerConfig) {
                $queueName = $messageServer->getQueueNameByWorkerType($wokerType);
                if (empty($queueName)) {
                    $this->_log("creare worker message queue failure, get queue name failure. workerType={$wokerType}");
                    continue;
                }
                $ret = $messageServer->createQueue($queueName);
                if (!$ret) {
                    $this->_log("creare worker message queue failure. workerType={$wokerType}");
                }
    
                if (isset($workerConfig['visibilityTimeout'])) {
                    //设置队列属性
                    $messageServer->setQueueAttributes($queueName, $workerConfig['visibilityTimeout']);
                }
            }
        }
    
    
        /**
         * 添加worker
         * @param string $workerType
         * @param  int $pid - 进程id
         */
        private function _addWorker($workerType, $pid)
        {
            if (!isset($this->_runningWorkers[$workerType])) {
                $this->_runningWorkers[$workerType] = [];
            }
            $this->_runningWorkers[$workerType][$pid] = true;
            $this->_pidMapToWorkerType[$pid] = $workerType;
        }
    
        /**
         * 根据worker type返回指定worker type目前正在允许的worker数量
         * @param $workerType
         * @return int
         */
        private function _getWorkers($workerType)
        {
            if (!isset($this->_runningWorkers[$workerType])) {
                return 0;
            }
            return count($this->_runningWorkers[$workerType]);
        }
    
        /**
         * 删除worker
         * @param int $pid      - 进程id
         * @return bool
         */
        private function _delWorkerByPid($pid) {
            if (!isset($this->_pidMapToWorkerType[$pid])) {
                return false;
            }
            $workerType = $this->_pidMapToWorkerType[$pid];
            unset($this->_pidMapToWorkerType[$pid]);
            if (isset($this->_runningWorkers[$workerType]) && isset($this->_runningWorkers[$workerType][$pid])) {
                unset($this->_runningWorkers[$workerType][$pid]);
            }
            return true;
        }
    
        /**
         * 返回workers总数
         * @return int
         */
        private function _getTotalWorkers()
        {
            if (empty($this->_runningWorkers)) {
                return 0;
            }
            $total = 0;
            foreach (array_keys($this->_runningWorkers) as $workerType) {
                $total += count($this->_runningWorkers[$workerType]);
            }
            return $total;
        }
    
        /**
         * 输出日志
         * @param $msg
         */
        private function _log($msg)
        {
            $dateStr = date("Y-m-d H:i:s");
            $pid = posix_getpid();
            echo "[{$dateStr}] [pid={$pid}] {$msg}
    ";
        }
    }
    <?php
    namespace ase;
    require 'BaseApplication.php';
    /**
     * 控制台应用 - 仅仅用于启动框架不做命令路由处理
     */
    class WorkerApplication extends ConsoleApplication
    {
    
        /**
         * Runs the application.
         * This method loads static application components. Derived classes usually overrides this
         * method to do more application-specific tasks.
         * Remember to call the parent implementation so that static application components are loaded.
         */
        public function run()
        {
            $this->_ended = false;
            $this->_preEnded = false;
            if($this->hasEventHandler('onBeginRequest'))
                $this->onBeginRequest(new CEvent($this));
            register_shutdown_function(array($this,'end'),0,false);
        }
    }
    <?php
    namespace ase;
    use AliyunMNSClient;
    use AliyunMNSExceptionMessageNotExistException;
    use AliyunMNSExceptionMnsException;
    use AliyunMNSExceptionQueueAlreadyExistException;
    use AliyunMNSModelQueueAttributes;
    use AliyunMNSRequestsCreateQueueRequest;
    use AliyunMNSRequestsSendMessageRequest;
    use AliyunMNSConfig as AlyConfig;
    use configconstantsWorkerTypes;
    use aseworkerWorkerMessage;
    use servicesServiceFactory;
    
    /**
     * 消息服务
     */
    class MessageServer
    {
        /**
         * @var MessageServer
         */
        private static $_instance;
    
        private $_conf;
    
        /**
         * @var Client
         */
        private $_client;
    
        private function __construct()
        {
            $this->_conf = Config::getInstance()->get('', 'mns');
            $aliConfig = new AlyConfig();
            $this->_client = new Client($this->_conf['endPoint'], $this->_conf['accessKeyId'], $this->_conf['accessSecret']);
        }
    
        /**
         * 获取消息服务
         * @return MessageServer
         */
        public static function getInstance()
        {
            if (self::$_instance == null) {
                self::$_instance = new MessageServer();
            }
            return self::$_instance;
        }
    
        /**
         * 创建队列
         * @param string $queueName     - 队列名
         *
         * @return bool 成功返回true, 失败返回false
         */
        public function createQueue($queueName)
        {
            if (empty($queueName)) {
                return false;
            }
            $request = new CreateQueueRequest($queueName);
            try {
                $res = $this->_client->createQueue($request);
                return true;
            }
            catch (QueueAlreadyExistException $e) {
                //队列已经存在
                return true;
            }
            catch (MnsException $e) {
                Logger::error("create message queue failure. queue={$queueName}, mns code={$e->getMnsErrorCode()}, msg={$e->getMessage()}");
            }
            return false;
        }
    
        /**
         * 设置队列属性
         * @param int $visibilityTimeout - worker获取消息后,多长时间内其他worker不能消费同一条消息,单位秒,最长12小时内
         * @return bool
         */
        public function setQueueAttributes($queueName, $visibilityTimeout)
        {
            if (empty($queueName)) {
                return false;
            }
    
            $attr = new QueueAttributes();
            $attr->setVisibilityTimeout($visibilityTimeout);
            $queue = $this->_client->getQueueRef($queueName);
            try {
                $queue->setAttribute($attr);
                return true;
            }
            catch (MnsException $e) {
                Logger::error("set message queue attr failure. queue={$queueName}, mns code={$e->getMnsErrorCode()}, msg={$e->getMessage()}");
            }
            return false;
        }
    
        /**
         * 发送消息
         * @param string $queueName     - 队列名
         * @param string $msgBody       - 消息内容
         * @param int $delaySeconds     - 延迟时间,单位秒 0-604800秒(7天)范围内某个整数值
         * @return bool
         * 成功返回消息id, 失败返回false
         */
        public function send($queueName, $msgBody, $delaySeconds=null)
        {
            if (empty($queueName)) {
                return false;
            }
    
            $queue = $this->_client->getQueueRef($queueName);
            $request = new SendMessageRequest($msgBody,$delaySeconds);
            try {
                $res = $queue->sendMessage($request);
                return $res->getMessageId();
            }
            catch (MnsException $e) {
                Logger::error("send message failure. queue={$queueName}, mns code={$e->getMnsErrorCode()}, msg={$e->getMessage()}");
            }
            return false;
        }
    
        /**
         * 获取消息
         * @param string $queueName     - 队列名
         * @param int $waitSeconds      - 队列消息为空时等待多长时间,非0表示这次receiveMessage是一次http long polling,如果queue内刚好没有message,那么这次request会在server端等到queue内有消息才返回。最长等待时间为waitSeconds的值,最大为30。
         * @return AliyunMNSResponsesReceiveMessageResponse
         * 成功返回消息对象,失败返回false
         */
        public function receive($queueName, $waitSeconds = 0)
        {
            $queue = $this->_client->getQueueRef($queueName);
            try {
                return $queue->receiveMessage($waitSeconds);
            }
            catch (MessageNotExistException $e) {
                //没有消息不抛异常
                return false;
            }
            catch (MnsException $e) {
                Logger::error("receive message failure. queue={$queueName}, mns code={$e->getMnsErrorCode()}, msg={$e->getMessage()}");
                return false;
            }
        }
    
        /**
         * 修改消息可见时间, 既从现在到下次可被用来消费的时间间隔
         * @param string $queueName     - 队列名
         * @param string $receiptHandle   - 消息句柄
         * @param int $visibilityTimeout  - 从现在到下次可被用来消费的时间间隔,单位为秒
         * @return bool 成功返回true, 失败false
         */
        public function changeMessageVisibility($queueName, $receiptHandle, $visibilityTimeout)
        {
            $queue = $this->_client->getQueueRef($queueName);
            try {
                $queue->changeMessageVisibility($receiptHandle,$visibilityTimeout);
                return true;
            }
            catch (MnsException $e) {
                Logger::error("change message visibility failure. queue={$queueName}, msg={$e->getMessage()}");
                return false;
            }
        }
    
        /**
         * 删除消息
         * @param string $queueName     - 队列名
         * @param mixed $receiptHandle  - 消息句柄
         * @return bool 删除成功返回true, 失败返回false
         */
        public function delete($queueName, $receiptHandle)
        {
            $queue = $this->_client->getQueueRef($queueName);
            try {
                $res = $queue->deleteMessage($receiptHandle);
                return true;
            }
            catch (MnsException $e)
            {
                Logger::error("delete message failure. queue={$queueName}, mns code={$e->getMnsErrorCode()}, msg={$e->getMessage()}");
                return false;
            }
        }
    
        /**
         * 把消息发送给指定的worker执行
         * @param string $workerType - worker type
         * @param array $params - 任务参数
         * @param int $delaySeconds - 延迟时间,单位秒 0-604800秒(7天)范围内某个整数值
         * @return bool 成功返回true, 失败返回false
         * @throws CDbException
         */
        public function dispatch($workerType, array $params = [], $delaySeconds = null)
        {
            //dev环境不走消息队列, 直接调用消息处理器
            if (LWM_ENV == 'dev') {
                $workerConfig = null;
                try {
                    $workerConfig = Config::getInstance()->get("workers.{$workerType}", 'worker');
                } catch (CDbException $e) {
                    throw new CDbException("worker config not found, workerType={$workerType}");
                }
    
                if (!isset($workerConfig['handler']) || empty($workerConfig['handler'])) {
                    throw new CDbException("worker config invalid, workerType={$workerType}");
                }
    
                $handler = $workerConfig['handler'];
                $srvObj = ServiceFactory::getService($handler[0]);
                $ret = call_user_func_array([$srvObj, $handler[1]], $params);
                if (!empty($ret)) {
                    return true;
                }
                return false;
            }
    
            $queueName = $this->getQueueNameByWorkerType($workerType);
            if (empty($queueName)) {
                return false;
            }
    
            $msg = new WorkerMessage();
            $msg->setWorkerType($workerType);
            $msg->setParams($params);
            $ret = $this->send($queueName, $msg->serialize(), $delaySeconds);
            if ($ret !== false) {
                return true;
            }
            return false;
        }
    
        /**
         * 根据worker type获取队列名
         * @param string $workerType        - worker type
         * @return bool|string 成功返回队列名, 失败返回false
         */
        public function getQueueNameByWorkerType($workerType)
        {
            if (empty($workerType)) {
                return false;
            }
            //获取worker配置
            $config = Config::getInstance()->get("workers.{$workerType}", 'worker');
            if (empty($config)) {
                Logger::error("worker config not found. workerType={$workerType}");
                return false;
            }
    
            $env = Config::getInstance()->get('env');
            $name = "{$env}-lwmWorker-{$config['jobName']}";
            return $name;
        }
    }
    <?php
    declare(ticks=1);
    namespace aseworker;
    use AliyunMNSExceptionMnsException;
    use aseConfig;
    use aseLogger;
    use aseMessageServer;
    use aseWorkerApplication;
    use servicesServiceFactory;
    
    /**
     * Worker 工作进程, 主要用于执行异步任务
     */
    class Worker
    {
        /**
         * 当前实例
         * @var Worker
         */
        private static $_instance = null;
    
    
        /**
         * worker任务配置
         */
        private $_conf;
    
        /**
         * @var WorkerApplication
         */
        private $_app;
    
        /**
         * 当前 worker队列名
         * @var string
         */
        private $_workerQueueName = '';
    
        /**
         * 初始化workerType, 任务执行过程,如果共享队列,同一个worker可以执行多个worker type 任务
         * @var string
         */
        private $_workerType = '';
    
        /**
         * 是否结束worker
         * @var bool
         */
        private $_flgWorkerExit = false;
    
        private function __construct($workerType)
        {
            $this->_app = YII::app();
            $this->_app->run();
            $this->_workerType = $workerType;
            //记录路由信息
            Logger::$route = $workerType;
            $this->_workerQueueName = MessageServer::getInstance()->getQueueNameByWorkerType($workerType);
            if (empty($this->_workerQueueName)) {
                Logger::error("worker get queue name failure, config invalid. workerType={$workerType}");
                throw new CDbException("worker get queue name failure, config invalid. workerType={$workerType}");
            }
    
            //获取worker配置
            $this->_conf = Config::getInstance()->get('', 'worker');
    
            //注册信号处理
            pcntl_signal(SIGTERM, [$this, 'doSignal']);
            pcntl_signal(SIGQUIT, [$this, 'doSignal']);
        }
    
        /**
         * 获取定时任务服务
         * @param string $workerType    - worker类型
         * @return Worker
         */
        public static function getInstance($workerType)
        {
            if (self::$_instance == null) {
                self::$_instance = new Worker($workerType);
            }
            return self::$_instance;
        }
    
        /**
         * worker进程入口
         */
        public function run()
        {
            set_time_limit(0);
    
            //设置用户组
            $userName = $this->_conf['user'];
            $userInfo = posix_getpwnam($userName);
            if (empty($userName)) {
                Logger::error("start worker failure, get userinfo failure. user={$userName}");
                return;
            }
            posix_setuid($userInfo['uid']);
            posix_setgid($userInfo['gid']);
            $config = $this->_conf['workers'][$this->_workerType];
            $progName = "lwm-worker: {$this->_workerQueueName}";
            swoole_set_process_name($progName);
    
            //启动时间
            $startTime = time();
            //当前worker处理任务数
            $currentExcutedTasks = 0;
            $this->_flgWorkerExit = false;
            while (!$this->_flgWorkerExit) {
                $currentTime = time();
    
                $this->_app->run();
                //处理任务
                $this->_doWorkerTask($this->_workerQueueName);
                $this->_app->end(0, false);
    
                $currentExcutedTasks++;
                if (($currentTime - $startTime) > $config['lifeTime']) {
                    //超出存活时间,自动退出
                    $this->_flgWorkerExit = true;
                    Logger::info("worker (workerType={$this->_workerType}) run time exceed lifetime, exit worker.");
                    break;
                }
    
                //超出最大任务处理次数, 自动退出
                if ($currentExcutedTasks > $config['maxHandleNum']) {
                    $this->_flgWorkerExit = true;
                    Logger::info("worker (workerType={$this->_workerType}) done tasks exceed maxHandleNum, exit worker.");
                    break;
                }
            }
        }
    
        /**
         * 处理进程信号
         * @param int $sig  - 信号类型
         */
        public function doSignal($sig) {
            switch ($sig) {
                case SIGTERM:
                    //进程退出处理
                    $this->_flgWorkerExit = true;
                    Logger::info("worker recv terminate signal. pid=" . posix_getpid());
                    break;
            }
        }
    
        /**
         * 处理worker任务
         * @param string $workerMsgQueueName - 队列名
         * @throws CDbException
         * @throws RedisException
         */
        private function _doWorkerTask($workerMsgQueueName)
        {
            $response = null;
            try {
                $waitSeconds = $this->_conf['pollingWaitSeconds'];
                $response = MessageServer::getInstance()->receive($workerMsgQueueName,$waitSeconds);
                if ($response === false) {
                    //没有消息休眠1秒
                    sleep(1);
                    return;
                }
    
                Logger::info("worker recv message msgId={$response->getMessageId()}, msg={$response->getMessageBody()}");
    
                $workerMsg = new WorkerMessage($response->getMessageBody());
                $workerType = $workerMsg->getWorkerType();
                if (!isset($this->_conf['workers'][$workerType])) {
                    Logger::error("invalid message, worker config not found. worker type={$workerType}");
                    MessageServer::getInstance()->delete($workerMsgQueueName, $response->getReceiptHandle());
                    return;
                }
    
                $config = $this->_conf['workers'][$workerType];
                if ($config['preConsume']) {
                    //预先删除消息
                    MessageServer::getInstance()->delete($workerMsgQueueName, $response->getReceiptHandle());
                    Logger::debug("pre delete message. msgId={$response->getMessageId()}");
                }
    
                $hander = $config['handler'];
                $srvObj = ServiceFactory::getService($hander[0]);
                Logger::debug("worker execute message handler=" . json_encode($hander) . ", msgId={$response->getMessageId()}");
                $ret = call_user_func_array([$srvObj, $hander[1]], $workerMsg->getParams());
                Logger::debug("worker execute message handler result=" . json_encode($ret) .", msgId={$response->getMessageId()}");
    
                if (!empty($ret) && !$config['preConsume']) {
                    //任务处理成功,删除消息
                    MessageServer::getInstance()->delete($workerMsgQueueName, $response->getReceiptHandle());
                    Logger::debug("finish task delete message. msgId={$response->getMessageId()}");
                }
    
            } catch (WorkerMessageInvalidException $e) {
                //消息格式不正确
                if ($response) {
                    //删除消息
                    MessageServer::getInstance()->delete($workerMsgQueueName, $response->getReceiptHandle());
                    Logger::error("worker error, error={$e->getMessage()}");
                }
            } catch (CDbException $e) {
                //如果出现数据库异常直接抛出异常退出worker
                throw $e;
            } catch (RedisException $e) {
                //redis异常直接退出
                throw $e;
            } catch (Exception $e) {
                Logger::error("worker error, msg={$e->getMessage()}");
                //异常休眠1秒
                sleep(1);
            }
        }
    }
    <?php
    namespace aseworker;
    
    /**
     * worker消息封装
     */
    class WorkerMessage
    {
        //worker类型
        private $_workerType = '';
        //worker参数
        private $_params = [];
    
        public function __construct($srcData = '')
        {
            if (!empty($srcData)) {
                $this->unSerialize($srcData);
            }
        }
    
        /**
         * @return mixed
         */
        public function getWorkerType()
        {
            return $this->_workerType;
        }
    
        /**
         * @param mixed $workerType
         */
        public function setWorkerType($workerType)
        {
            $this->_workerType = $workerType;
        }
    
        /**
         * @return mixed
         */
        public function getParams()
        {
            return $this->_params;
        }
    
        /**
         * @param mixed $params
         */
        public function setParams($params)
        {
            $this->_params = $params;
        }
    
        /**
         * 序列化
         * @return string
         */
        public function serialize()
        {
            $data = [
                'workerType' => $this->_workerType,
                'params' => $this->_params
            ];
            return json_encode($data);
        }
    
        /**
         * 返序列化
         * @param string $srcData      - 原始数据
         * @throws Exception
         */
        public function unSerialize($srcData)
        {
            if (empty($srcData)) {
                throw new WorkerMessageInvalidException("worker msg is empty");
            }
            $data = json_decode($srcData, true);
            if (empty($data)) {
                throw new WorkerMessageInvalidException("WorkerMessage invalid. data={$srcData}");
            }
            if (!isset($data['workerType']) || !isset($data['params'])) {
                throw new WorkerMessageInvalidException("WorkerMessage invalid. data={$srcData}");
            }
            $this->setWorkerType($data['workerType']);
            $this->setParams($data['params']);
        }
    }
    <?php
    use SwooleProcess;
    use SwooleTimer;
    
    /**
     * 定时任务服务
     */
    class Crond
    {
        /**
         * 当前实例
         * @var Crond
         */
        private static $_instance = null;
    
        /**
         * 定时任务配置
         */
        private $_conf;
    
        /**
         * @var WorkerApplication
         */
        private $_app;
    
        private $_runningTasks = [];
    
        /**
         * 退出状态
         * @var bool
         */
        private $_flgExit = false;
    
        private function __construct()
        {
            $this->_conf = Config::getInstance()->get("", 'cron');//取出配置信息
            $this->_app = YII::app();
            //注册子进程回收信号处理
            Process::signal(SIGCHLD, [$this, 'doSignal']);
            Process::signal(SIGTERM, [$this, 'doSignal']);
        }
    
        /**
         * 获取定时任务服务
         * @return Crond
         */
        public static function getInstance()
        {
            if (self::$_instance == null) {
                self::$_instance = new Crond();
            }
            return self::$_instance;
        }
    
        public function start()
        {
            $options = getopt('d');
            $this->_log("start cron server...");
            if (isset($options['d'])) {
                Process::daemon();
                file_put_contents($this->_conf['pid'], posix_getpid());
            }
    
            Timer::tick(1000, [$this, 'doTask']);
            //10s 加载一次配置
            Timer::tick(10000, function () {
                $this->_conf = Config::getInstance()->get("", 'cron', true);
            });
        }
    
        /**
         * 定时器每秒回调函数
         * @param int $timer_id     - 定时器的ID
         * @param mixed $params
         */
        public function doTask($timer_id, $params = null)
        {
            //开始任务
            $currentTime = time();
            if (isset($this->_conf['jobs']) && !empty($this->_conf['jobs'])) {
                foreach ($this->_conf['jobs'] as $jobId => $job) {
                    if (!isset($job['title']) || !isset($job['cron']) || !isset($job['command']) || !isset($job['id'])) {
                        $this->_log("crontab job config error");
                        continue;
                    }
    
                    if ($this->_isTimeByCron($currentTime, $job['cron'])) {
                        if (isset($this->_runningTasks[$job['id']])) {
                            $this->_log("last cron worker not exit. job id={$job['id']}");
                            continue;  
                        }
    
                        //启动任务
                        $cronWorker =  new Process(function (Process $worker) use($job) {
                            $this->doCronTask($worker, $job);
                        });
    
                        $pid = $cronWorker ->start();
                        if ($pid === false) {
                            $this->_log("start cron worker failure.");
                            continue;
                        }
                        $this->_runningTasks[$job['id']] = $pid;
                        $cronWorker->write(json_encode($job));
                    }
                }
            }
        }
    
        /**
         * do cron worker
         */
        public function doCronTask($worker, $job)
        {
            //clear log
            Yii::getLogger()->flush();
            $this->_log("cron worker running task={$job['title']}, jobId={$job['id']}");
            $command = PROJECT_PATH . '/scripts/yiic';
             set_time_limit(0);
             $cmdArgs = explode(' ',  $job['command']);
             $worker->exec($command,  $cmdArgs);
        }
    
        /**
         * 处理进程信号
         * @param int $sig  - 信号类型
         */
        public function doSignal($sig) {
            $pidToJobId = array_flip($this->_runningTasks);
            switch ($sig) {
                case SIGCHLD:
                    //必须为false,非阻塞模式
                    while($ret =  Process::wait(false)) {
    //                    echo "recycle child process PID={$ret['pid']}
    ";
                        $exitPid = $ret['pid'];
                        if (isset($pidToJobId[$exitPid])) {
                            $jobId = $pidToJobId[$exitPid];
                            unset($this->_runningTasks[$jobId]);
                        }
                    }
                    //当子进程都退出后,结束masker进程
                    if (empty($this->_runningTasks) && $this->_flgExit) {
                        @unlink($this->_conf['pid']);
                        exit(0);
                    }
    
                    break;
                case SIGTERM:
                    $this->_log("recv terminate signal, exit crond.");
                    $this->_flgExit = true;
                    break;
            }
        }
    
        /**
         * 根据定时任务时间配置,检测当前时间是否在指定时间内
         * @param int $time     - 当前时间
         * @param string $cron  - 定时任务配置
         * @return bool 不在指定时间内返回false, 否则返回true
         */
        private function _isTimeByCron($time, $cron)
        {
            $cronParts = explode(' ', $cron);
            if (count($cronParts) != 6) {
                return false;
            }
    
            list($sec, $min, $hour, $day, $mon, $week) = $cronParts;
    
            $checks = array('sec' => 's', 'min' => 'i', 'hour' => 'G', 'day' => 'j', 'mon' => 'n', 'week' => 'w');
    
            $ranges = array(
                'sec' => '0-59',
                'min' => '0-59',
                'hour' => '0-23',
                'day' => '1-31',
                'mon' => '1-12',
                'week' => '0-6',
            );
    
            foreach ($checks as $part => $c) {
                $val = $$part;
                $values = array();
    
                /*
                    For patters like 0-23/2
                */
                if (strpos($val, '/') !== false) {
                    //Get the range and step
                    list($range, $steps) = explode('/', $val);
    
                    //Now get the start and stop
                    if ($range == '*') {
                        $range = $ranges[$part];
                    }
                    list($start, $stop) = explode('-', $range);
    
                    for ($i = $start; $i <= $stop; $i = $i + $steps) {
                        $values[] = $i;
                    }
                } /*
                    For patters like :
                    2
                    2,5,8
                    2-23
                */
                else {
                    $k = explode(',', $val);
    
                    foreach ($k as $v) {
                        if (strpos($v, '-') !== false) {
                            list($start, $stop) = explode('-', $v);
    
                            for ($i = $start; $i <= $stop; $i++) {
                                $values[] = $i;
                            }
                        } else {
                            $values[] = $v;
                        }
                    }
                }
    
                if (!in_array(date($c, $time), $values) and (strval($val) != '*')) {
                    return false;
                }
            }
    
            return true;
        }
    
        /**
         * 输出日志
         * @param $msg
         */
        private function _log($msg)
        {
                $dateStr = date("Y-m-d H:i:s");
                echo "[{$dateStr}] {$msg}
    ";
        }
    }
  • 相关阅读:
    python+selenium之页面元素截图
    selenium八大定位
    http概述之URL与资源
    数组中只出现一次的数字
    数字在排序数组中出现的次数
    把数组排成最小的数
    数组中出现次数超过一半的数字
    调整数组顺序使得奇数位于偶数的前面
    旋转数组的最小值
    二维数组的查找
  • 原文地址:https://www.cnblogs.com/lyc94620/p/9485583.html
Copyright © 2011-2022 走看看