zoukankan      html  css  js  c++  java
  • php多进程编程实例

    <?php
    
    namespace Console;
    
    include_once(dirname(__FILE__) . "/../DAOs/DAOFactory.php");
    
    use HelpersHelper;
    use ConfigConfig;
    use ServicesTaskService;
    use LibsLogLog;
    use LibsLogCLogFileHandler;
    
    class UctDataWorker
    {
    
        protected $diagnosticSessionDAO = null;
        protected $DAOResultEnum = null;
        protected $childs = array();
        protected $signal = null;
        protected $master_pid = 0;
    
        public function __construct()
        {
            include(dirname(__FILE__) . "/../Enums/DAOEnums.php");
            $this->DAOResultEnum = $DAOResultEnum;
    
            $this->diagnosticSessionDAO = DAOFactory::getFactory()->getDiagnosticSessionDAO();
            $this->signal = $this->getSignal();
            $this->master_pid = posix_getpid();
        }
    
        protected function getSignal()
        {
            return new Signal($this);
        }
    
        //通过文件独占锁防止多个子进程并发读取同一条数据
        //但注意:其函数flock无法再NFS或其他网络文件系统中使用也无法在多线程服务器API中使用。
    protected function lockProcess($task_num) { $lock_handle = Helper::lockFile(); if($lock_handle !== false) { try{ $task_list = TaskService::getUnstartTask($task_num); if($task_list) { foreach ($task_list as $task) { $tas_ser_id = $task['tas_ser_id']; $tas_id = $task['tas_id']; TaskService::updateTask($tas_ser_id, $tas_id, 'running'); } sleep(1); } Helper::freeLock($lock_handle); return $task_list; } catch(Exception $e) { Log::Init(new CLogFileHandler()); Log::INFO(date('Y-m-d H:i:s', time())."---exception---".$e->getMessage()." "); Helper::freeLock($lock_handle); } } else { //获取锁失败,结束当前子进程 Log::Init(new CLogFileHandler()); Log::INFO(date('Y-m-d H:i:s', time())."---get_file_lock_fail---"." "); exit; } } public function run($argv) { $per_worker_task_num = intval($argv[3]); $task_list = $this->lockProcess($per_worker_task_num); if($task_list) { Log::Init(new CLogFileHandler()); foreach ($task_list as $task) { if ($task) { try { $tas_ser_id = $task['tas_ser_id']; $tas_id = $task['tas_id']; $tas_data = unserialize($task['tas_data']); $diagnosticSessionId = $tas_data['diagnosticSessionId']; $dataPath = $tas_data['dataPath']; $cevDataArray = $tas_data['cevDataArray']; $submitForAnalysis = $tas_data['submitForAnalysis']; $user = $tas_data['user']; $start_time = date('Y-m-d H:i:s', time()); $tasid_flag = 'tas_id:' . $tas_id . '---tas_ser_id:' . $tas_ser_id . "---"; Log::INFO(date('Y-m-d H:i:s', time())."---start_run---tas_id---" . $tas_id . " "); $daoResult = $this->diagnosticSessionDAO->AppendToDiagnosticSession($diagnosticSessionId, $dataPath, $cevDataArray, $submitForAnalysis, $patientId, //for return $invalidFiles, //for return $dateTime, //for return $user, false ); //insert db success if ($daoResult === $this->DAOResultEnum["DAO_SUCCESS"] && $patientId && $patientId->isValid() && $dateTime) { $returnObj = new stdClass; $returnObj->diagnosticSessionId = $diagnosticSessionId->toString(); $returnObj->patientId = $patientId->toString(); $returnObj->confirmationCode = $diagnosticSessionId->toString() . '; ' . $dateTime->format('Y-m-dTH:i:s'); $returnObj->invalidFiles = $invalidFiles; Log::INFO(" "); Log::INFO($tasid_flag); Log::INFO(var_export($returnObj, true)); Log::INFO(date('Y-m-d H:i:s', time()) . '---success' . " "); $arr = array( 'tas_start_date_time' => $start_time, 'tas_end_date_time' => date('Y-m-d H:i:s', time()) ); $result = TaskService::updateTask($tas_ser_id, $tas_id, 'success', $arr); if ($result === true) { $rm_result = Helper::delDir($task['tas_path']); if ($rm_result === true) { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---success' . " "); } else { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---fail' . " "); } } Log::INFO(" "); } else { //insert db fail $tip = ''; $arr = array( 'tas_start_date_time' => $start_time, 'tas_end_date_time' => date('Y-m-d H:i:s', time()) ); if ($daoResult === $this->DAOResultEnum["DAO_SUCCESS"] && $patientId && $patientId->isValid() && $dateTime === null) { $tip = '---no new data added!---'; $rm_result = Helper::delDir($task['tas_path']); //把已经存到数据库里的文件删除 if ($rm_result === true) { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---success' . " "); } else { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---fail' . " "); } TaskService::updateTask($tas_ser_id, $tas_id, 'nonewadd', $arr); } else { TaskService::updateTask($tas_ser_id, $tas_id, 'fail', $arr); } Log::INFO(" ".$tasid_flag.date('Y-m-d H:i:s', time()) . ':fail'.$tip." "); $result = array( 'log_time' => date('Y-m-d H:i:s', time()), 'tas_ser_id' => $tas_ser_id, 'tas_id' => $tas_id, 'diagnosticSessionId' => $diagnosticSessionId, 'dataPath' => $dataPath, 'daoResult' => $daoResult, 'patientId' => $patientId, 'dateTime' => $dateTime, ); Log::INFO(" " . var_export($result, true) . " "); } } catch (Exception $e) { Log::Init(new CLogFileHandler()); Log::ERROR($e->getMessage()); Log::INFO(" ".$tasid_flag.date('Y-m-d H:i:s', time()) . ':exception'." "); $arr = array( 'tas_start_date_time' => $start_time, 'tas_end_date_time' => date('Y-m-d H:i:s', time()) ); TaskService::updateTask($tas_ser_id, $tas_id, 'fail', $arr); } } } } } //手动启动master命令: //nohup /usr/bin/php /var/www/html/ECGVuService_Dev/Console/index.php UctDataWorker masterRun >> /data/log/uctWorker.log 2>&1 & //命令行常驻内存运行方式,每10秒扫描一次(配置文件里可以修改该配置) //每次修改代码后需要重启主进程才能生效 public function masterRun() { $worker_num = Config::$uct_insert_worker_num; $master_sleep_time = Config::$master_sleep_time; $master_pid = $this->master_pid; $master_pid_log = Config::$master_pid_log; file_put_contents($master_pid_log, $master_pid); while(true){ $this->signal->dispatch(); for($i=1; $i<=$worker_num; $i++) { if(array_key_exists($i, $this->childs)) { $pid = $this->childs[$i]; $child_pid_status = Helper::waitpid($pid); //通知内核释放僵尸进程、非阻塞方式 if($child_pid_status==-1 || $child_pid_status>0) { //子进程出错(-1)或退出(>0)再启动子进程 sleep(2); $this->forkWorker($i); } } else { sleep(2); $this->forkWorker($i); } } sleep($master_sleep_time); //防止CPU 100% } } /** * 开启子进程 * @params Int $index 子进程参数 */ protected function forkWorker($index) { $path = dirname(__FILE__) . '/index.php'; $php_path = Config::$php_path; // 开启分支 $pid = pcntl_fork(); if($pid == -1) { $log = sprintf("Fork worker failed! Index: %d ", $index); Log::Init(new CLogFileHandler(), true); Log::INFO($log); } if($pid) { //父进程处理逻辑 $this->childs[$index] = $pid; $log = sprintf("Fork worker success! pid: %s! ", $pid); Log::Init(new CLogFileHandler(), true); Log::INFO($log); } else { //子进程处理逻辑 $param = array("$path", "UctDataWorker", "run", "1"); pcntl_exec("$php_path", $param); exit(); //此处中断子进程 } $log = sprintf("Finish fork! Index: %d ", $index); Log::Init(new CLogFileHandler(), true); Log::INFO($log); } /*这里测试该方法始终无法关闭master进程,所以用下面的强制关闭的方法了 public function stopMaster($argv=array()) { $pid = intval($argv[3]); if($pid<=0) { $master_pid_log = Config::$master_pid_log; $pid = trim(file_get_contents($master_pid_log)); } echo "Wait stop: [$pid] "; posix_kill($pid, SIGTERM); $n = 0; while(++$n < 100) { pcntl_waitpid($pid, $status); $priority = @ pcntl_getpriority($pid); if(false === $priority) { break; } echo '.'; usleep(1000*200); } echo "Stopped. "; } */ /* * /usr/bin/php /var/www/html/ECGVuService_Dev/Console/index.php UctDataWorker forceStopMaster */ public function forceStopMaster($argv=array()) { $pid = intval($argv[3]); if($pid<=0) { $master_pid_log = Config::$master_pid_log; $pid = trim(file_get_contents($master_pid_log)); } echo "Wait stop: [$pid] "; $cmd = "kill -s 9 ".$pid; shell_exec("$cmd"); echo "Stopped. "; } public function reload() { echo 'receive signal reload and call function reload---pid is '.posix_getpid()." "; } public function childExit() { $pid = pcntl_waitpid(-1, $status, WNOHANG);
       if(pcntl_wifexited($status)) {
         echo "The child ".$pid." exit with code ".pcntl_wexitstatus($status)." ";
    } }
    public function stop() { echo 'receive signal stop and call function stop---pid is '.posix_getpid()." "; } //计划任务定时检测master进程是否存在,不存在则启动,防止master进程意外崩溃后任务中断,以root用户运行 public function checkMaster() { $cmd = 'ps axu|grep "UctDataWorker masterRun"|grep -v "grep"|wc -l'; $ret = shell_exec("$cmd"); $ret = rtrim($ret, " "); if($ret === "0") { $path = dirname(__FILE__) . '/index.php'; $php_path = Config::$php_path; $worker_run_log = Config::$worker_run_log; $start_master_cmd = "nohup ".$php_path." ".$path." UctDataWorker masterRun >> ".$worker_run_log." 2>&1 &"; exec("$start_master_cmd", $result); } } }

    Helper.php

    <?php
    
    namespace Helpers;
    
    set_time_limit(0);
    
    define('DS', DIRECTORY_SEPARATOR); // I always use this short form in my code.
    
    class Helper
    {
    
        public static function checkOs()
        {
            $os_name=PHP_OS;
            if(strpos($os_name,"Linux")!==false){
                $os_flag = 'Linux';
            }else if(strpos($os_name,"WIN")!==false){
                $os_flag = 'Win';
            }
    
            return $os_flag;
        }
    
        /**
         * @param $filename
         * @param int $n
         * @return bool|string
         */
        public static function getLastLines($filename, $n=1)
        {
            if(!$fp=fopen($filename,'r')){
                echo "打开文件失败,请检查文件路径是否正确,路径和文件名不要包含中文";
                return false;
            }
    
            $pos=-2;
            $eof="";
            $str="";
    
            while($n>0){
                while($eof!="
    "){
                    if(!fseek($fp,$pos,SEEK_END)){
                        $eof=fgetc($fp);
                        $pos--;
                    }else{
                        break;
                    }
                }
    
                $str.=fgets($fp);
                $eof="";
                $n--;
            }
    
            return $str;
        }
    
        public static function downloadFile($file_path)
        {
            $fileinfo = pathinfo($file_path);
            header('Content-type: application/x-'.$fileinfo['extension']);
            header('Content-Disposition: attachment; filename='.$fileinfo['basename']);
            header('Content-Length: '.filesize($file_path));
            readfile($file_path);
        }
    
        public static function arrIconv($input_charset='GBK', $output_charset='UTF-8', $arr)
        {
            $return = array();
    
            if(is_array($arr)) {
                foreach($arr as $k=>$item) {
                    if(is_array($item)) {
                        $return[$k] = self::arrIconv($input_charset, $output_charset, $item);
                    } else {
                        $return[$k] = iconv($input_charset, $output_charset, $item);
                    }
                }
            } else {
                $return = iconv($input_charset, $output_charset, $arr);
            }
    
            return $return;
        }
    
        public static function copyR($path, $dest) { // 原目录,复制到的目录
    
            if( is_dir($path) ) {
                @mkdir( $dest );
                $objects = scandir($path);
                if( sizeof($objects) > 0 ) {
                    foreach( $objects as $file ) {
                        if( $file == "." || $file == ".." ) {
                            continue;
                        }
    
                        // go on
                        if( is_dir( $path.DS.$file ) ) {
                            self::copyR( $path.DS.$file, $dest.DS.$file );
                        } else {
                            copy( $path.DS.$file, $dest.DS.$file );
                        }
                    }
                }
    
                return true;
    
            } elseif( is_file($path) ) {
                return copy($path, $dest);
            } else {
                return false;
            }
        }
    
        public static function getDirList($dir_path)
        {
            if(!is_dir($dir_path)) {
                return false;
            }
    
            $dir_list = array();
    
            $d = dir($dir_path);
            if ($d) {
                while (false !== ($entry = $d->read())) {
                    if ($entry != '.' && $entry != '..') {
                        $k = str_replace(array('-', '_'), array(''), $entry);
                        preg_match('/(d+)/', $k, $matches);
                        $dir_list[$matches[0]] = $entry;
                    }
                }
    
                $d->close();
            }
    
            ksort($dir_list);
    
            return $dir_list;
        }
    
        public static function random($length, $numeric = 0)
        {
            $seed = base_convert(md5(microtime().$_SERVER['DOCUMENT_ROOT']), 16, $numeric ? 10 : 35);
            $seed = $numeric ? (str_replace('0', '', $seed).'012340567890') : ($seed.'zZ'.strtoupper($seed));
    
            if($numeric) {
                $hash = '';
            } else {
                $hash = chr(rand(1, 26) + rand(0, 1) * 32 + 64);
                $length--;
            }
    
            $max = strlen($seed) - 1;
            for($i = 0; $i < $length; $i++) {
                $hash .= $seed{mt_rand(0, $max)};
            }
    
            return $hash;
        }
    
        public static function delDir($dir)
        {
            if(strtoupper(substr(PHP_OS, 0, 3)) == 'WIN') {
                $str = "rmdir /s/q " . $dir;
            } else {
                $str = "rm -Rf " . $dir;
            }
    
            exec($str, $result, $return_var);
            return true;
        }
    
        /**
         * @see pcntl_waitpid()
         */
        public static function waitpid($pid, &$status=0, $flag = WNOHANG) {
            return pcntl_waitpid($pid, $status, $flag);
        }
    
        /**
         * @see posix_kill()
         */
        public static function kill($pid) {
            return posix_kill($pid, SIGTERM);
        }
    
        public static function lockFile($file_path='/tmp/file_lock.lock')
        {
            $fp = fopen($file_path, "w+");
            if(flock($fp, LOCK_EX)) {
                fwrite($fp, date('Y-m-d H:i:s', time())."---pid[".posix_getpid()."]---lock suc!
    ");
                return $fp;
            } else {
           fclose($fp);
    return false; } } public static function freeLock($fp) { fflush($fp); flock($fp, LOCK_UN); fclose($fp); } }
  • 相关阅读:
    Kattis
    HackerRank
    HackerRank
    牛客小白月赛1 C 分元宵【快速幂】
    牛客小白月赛1 J おみやげをまらいました 【MAP】
    logback与Spring、SpringMVC结合使用教程
    解决maven项目pom报错
    sz与rz命令
    cassandra 测试数据库
    ubuntu 无法解析主机的解决方法
  • 原文地址:https://www.cnblogs.com/dongruiha/p/6400979.html
Copyright © 2011-2022 走看看