<?php namespace Console; include_once(dirname(__FILE__) . "/../DAOs/DAOFactory.php"); use HelpersHelper; use ConfigConfig; use ServicesTaskService; use LibsLogLog; use LibsLogCLogFileHandler; class UctDataWorker { protected $diagnosticSessionDAO = null; protected $DAOResultEnum = null; protected $childs = array(); protected $signal = null; protected $master_pid = 0; public function __construct() { include(dirname(__FILE__) . "/../Enums/DAOEnums.php"); $this->DAOResultEnum = $DAOResultEnum; $this->diagnosticSessionDAO = DAOFactory::getFactory()->getDiagnosticSessionDAO(); $this->signal = $this->getSignal(); $this->master_pid = posix_getpid(); } protected function getSignal() { return new Signal($this); } //通过文件独占锁防止多个子进程并发读取同一条数据 //但注意:其函数flock无法再NFS或其他网络文件系统中使用也无法在多线程服务器API中使用。
protected function lockProcess($task_num) { $lock_handle = Helper::lockFile(); if($lock_handle !== false) { try{ $task_list = TaskService::getUnstartTask($task_num); if($task_list) { foreach ($task_list as $task) { $tas_ser_id = $task['tas_ser_id']; $tas_id = $task['tas_id']; TaskService::updateTask($tas_ser_id, $tas_id, 'running'); } sleep(1); } Helper::freeLock($lock_handle); return $task_list; } catch(Exception $e) { Log::Init(new CLogFileHandler()); Log::INFO(date('Y-m-d H:i:s', time())."---exception---".$e->getMessage()." "); Helper::freeLock($lock_handle); } } else { //获取锁失败,结束当前子进程 Log::Init(new CLogFileHandler()); Log::INFO(date('Y-m-d H:i:s', time())."---get_file_lock_fail---"." "); exit; } } public function run($argv) { $per_worker_task_num = intval($argv[3]); $task_list = $this->lockProcess($per_worker_task_num); if($task_list) { Log::Init(new CLogFileHandler()); foreach ($task_list as $task) { if ($task) { try { $tas_ser_id = $task['tas_ser_id']; $tas_id = $task['tas_id']; $tas_data = unserialize($task['tas_data']); $diagnosticSessionId = $tas_data['diagnosticSessionId']; $dataPath = $tas_data['dataPath']; $cevDataArray = $tas_data['cevDataArray']; $submitForAnalysis = $tas_data['submitForAnalysis']; $user = $tas_data['user']; $start_time = date('Y-m-d H:i:s', time()); $tasid_flag = 'tas_id:' . $tas_id . '---tas_ser_id:' . $tas_ser_id . "---"; Log::INFO(date('Y-m-d H:i:s', time())."---start_run---tas_id---" . $tas_id . " "); $daoResult = $this->diagnosticSessionDAO->AppendToDiagnosticSession($diagnosticSessionId, $dataPath, $cevDataArray, $submitForAnalysis, $patientId, //for return $invalidFiles, //for return $dateTime, //for return $user, false ); //insert db success if ($daoResult === $this->DAOResultEnum["DAO_SUCCESS"] && $patientId && $patientId->isValid() && $dateTime) { $returnObj = new stdClass; $returnObj->diagnosticSessionId = $diagnosticSessionId->toString(); $returnObj->patientId = $patientId->toString(); $returnObj->confirmationCode = $diagnosticSessionId->toString() . '; ' . $dateTime->format('Y-m-dTH:i:s'); $returnObj->invalidFiles = $invalidFiles; Log::INFO(" "); Log::INFO($tasid_flag); Log::INFO(var_export($returnObj, true)); Log::INFO(date('Y-m-d H:i:s', time()) . '---success' . " "); $arr = array( 'tas_start_date_time' => $start_time, 'tas_end_date_time' => date('Y-m-d H:i:s', time()) ); $result = TaskService::updateTask($tas_ser_id, $tas_id, 'success', $arr); if ($result === true) { $rm_result = Helper::delDir($task['tas_path']); if ($rm_result === true) { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---success' . " "); } else { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---fail' . " "); } } Log::INFO(" "); } else { //insert db fail $tip = ''; $arr = array( 'tas_start_date_time' => $start_time, 'tas_end_date_time' => date('Y-m-d H:i:s', time()) ); if ($daoResult === $this->DAOResultEnum["DAO_SUCCESS"] && $patientId && $patientId->isValid() && $dateTime === null) { $tip = '---no new data added!---'; $rm_result = Helper::delDir($task['tas_path']); //把已经存到数据库里的文件删除 if ($rm_result === true) { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---success' . " "); } else { Log::INFO($tasid_flag . 'rm dir---' . $task['tas_path'] . '---fail' . " "); } TaskService::updateTask($tas_ser_id, $tas_id, 'nonewadd', $arr); } else { TaskService::updateTask($tas_ser_id, $tas_id, 'fail', $arr); } Log::INFO(" ".$tasid_flag.date('Y-m-d H:i:s', time()) . ':fail'.$tip." "); $result = array( 'log_time' => date('Y-m-d H:i:s', time()), 'tas_ser_id' => $tas_ser_id, 'tas_id' => $tas_id, 'diagnosticSessionId' => $diagnosticSessionId, 'dataPath' => $dataPath, 'daoResult' => $daoResult, 'patientId' => $patientId, 'dateTime' => $dateTime, ); Log::INFO(" " . var_export($result, true) . " "); } } catch (Exception $e) { Log::Init(new CLogFileHandler()); Log::ERROR($e->getMessage()); Log::INFO(" ".$tasid_flag.date('Y-m-d H:i:s', time()) . ':exception'." "); $arr = array( 'tas_start_date_time' => $start_time, 'tas_end_date_time' => date('Y-m-d H:i:s', time()) ); TaskService::updateTask($tas_ser_id, $tas_id, 'fail', $arr); } } } } } //手动启动master命令: //nohup /usr/bin/php /var/www/html/ECGVuService_Dev/Console/index.php UctDataWorker masterRun >> /data/log/uctWorker.log 2>&1 & //命令行常驻内存运行方式,每10秒扫描一次(配置文件里可以修改该配置) //每次修改代码后需要重启主进程才能生效 public function masterRun() { $worker_num = Config::$uct_insert_worker_num; $master_sleep_time = Config::$master_sleep_time; $master_pid = $this->master_pid; $master_pid_log = Config::$master_pid_log; file_put_contents($master_pid_log, $master_pid); while(true){ $this->signal->dispatch(); for($i=1; $i<=$worker_num; $i++) { if(array_key_exists($i, $this->childs)) { $pid = $this->childs[$i]; $child_pid_status = Helper::waitpid($pid); //通知内核释放僵尸进程、非阻塞方式 if($child_pid_status==-1 || $child_pid_status>0) { //子进程出错(-1)或退出(>0)再启动子进程 sleep(2); $this->forkWorker($i); } } else { sleep(2); $this->forkWorker($i); } } sleep($master_sleep_time); //防止CPU 100% } } /** * 开启子进程 * @params Int $index 子进程参数 */ protected function forkWorker($index) { $path = dirname(__FILE__) . '/index.php'; $php_path = Config::$php_path; // 开启分支 $pid = pcntl_fork(); if($pid == -1) { $log = sprintf("Fork worker failed! Index: %d ", $index); Log::Init(new CLogFileHandler(), true); Log::INFO($log); } if($pid) { //父进程处理逻辑 $this->childs[$index] = $pid; $log = sprintf("Fork worker success! pid: %s! ", $pid); Log::Init(new CLogFileHandler(), true); Log::INFO($log); } else { //子进程处理逻辑 $param = array("$path", "UctDataWorker", "run", "1"); pcntl_exec("$php_path", $param); exit(); //此处中断子进程 } $log = sprintf("Finish fork! Index: %d ", $index); Log::Init(new CLogFileHandler(), true); Log::INFO($log); } /*这里测试该方法始终无法关闭master进程,所以用下面的强制关闭的方法了 public function stopMaster($argv=array()) { $pid = intval($argv[3]); if($pid<=0) { $master_pid_log = Config::$master_pid_log; $pid = trim(file_get_contents($master_pid_log)); } echo "Wait stop: [$pid] "; posix_kill($pid, SIGTERM); $n = 0; while(++$n < 100) { pcntl_waitpid($pid, $status); $priority = @ pcntl_getpriority($pid); if(false === $priority) { break; } echo '.'; usleep(1000*200); } echo "Stopped. "; } */ /* * /usr/bin/php /var/www/html/ECGVuService_Dev/Console/index.php UctDataWorker forceStopMaster */ public function forceStopMaster($argv=array()) { $pid = intval($argv[3]); if($pid<=0) { $master_pid_log = Config::$master_pid_log; $pid = trim(file_get_contents($master_pid_log)); } echo "Wait stop: [$pid] "; $cmd = "kill -s 9 ".$pid; shell_exec("$cmd"); echo "Stopped. "; } public function reload() { echo 'receive signal reload and call function reload---pid is '.posix_getpid()." "; } public function childExit() { $pid = pcntl_waitpid(-1, $status, WNOHANG);
if(pcntl_wifexited($status)) {
echo "The child ".$pid." exit with code ".pcntl_wexitstatus($status)." ";
} } public function stop() { echo 'receive signal stop and call function stop---pid is '.posix_getpid()." "; } //计划任务定时检测master进程是否存在,不存在则启动,防止master进程意外崩溃后任务中断,以root用户运行 public function checkMaster() { $cmd = 'ps axu|grep "UctDataWorker masterRun"|grep -v "grep"|wc -l'; $ret = shell_exec("$cmd"); $ret = rtrim($ret, " "); if($ret === "0") { $path = dirname(__FILE__) . '/index.php'; $php_path = Config::$php_path; $worker_run_log = Config::$worker_run_log; $start_master_cmd = "nohup ".$php_path." ".$path." UctDataWorker masterRun >> ".$worker_run_log." 2>&1 &"; exec("$start_master_cmd", $result); } } }
Helper.php
<?php namespace Helpers; set_time_limit(0); define('DS', DIRECTORY_SEPARATOR); // I always use this short form in my code. class Helper { public static function checkOs() { $os_name=PHP_OS; if(strpos($os_name,"Linux")!==false){ $os_flag = 'Linux'; }else if(strpos($os_name,"WIN")!==false){ $os_flag = 'Win'; } return $os_flag; } /** * @param $filename * @param int $n * @return bool|string */ public static function getLastLines($filename, $n=1) { if(!$fp=fopen($filename,'r')){ echo "打开文件失败,请检查文件路径是否正确,路径和文件名不要包含中文"; return false; } $pos=-2; $eof=""; $str=""; while($n>0){ while($eof!=" "){ if(!fseek($fp,$pos,SEEK_END)){ $eof=fgetc($fp); $pos--; }else{ break; } } $str.=fgets($fp); $eof=""; $n--; } return $str; } public static function downloadFile($file_path) { $fileinfo = pathinfo($file_path); header('Content-type: application/x-'.$fileinfo['extension']); header('Content-Disposition: attachment; filename='.$fileinfo['basename']); header('Content-Length: '.filesize($file_path)); readfile($file_path); } public static function arrIconv($input_charset='GBK', $output_charset='UTF-8', $arr) { $return = array(); if(is_array($arr)) { foreach($arr as $k=>$item) { if(is_array($item)) { $return[$k] = self::arrIconv($input_charset, $output_charset, $item); } else { $return[$k] = iconv($input_charset, $output_charset, $item); } } } else { $return = iconv($input_charset, $output_charset, $arr); } return $return; } public static function copyR($path, $dest) { // 原目录,复制到的目录 if( is_dir($path) ) { @mkdir( $dest ); $objects = scandir($path); if( sizeof($objects) > 0 ) { foreach( $objects as $file ) { if( $file == "." || $file == ".." ) { continue; } // go on if( is_dir( $path.DS.$file ) ) { self::copyR( $path.DS.$file, $dest.DS.$file ); } else { copy( $path.DS.$file, $dest.DS.$file ); } } } return true; } elseif( is_file($path) ) { return copy($path, $dest); } else { return false; } } public static function getDirList($dir_path) { if(!is_dir($dir_path)) { return false; } $dir_list = array(); $d = dir($dir_path); if ($d) { while (false !== ($entry = $d->read())) { if ($entry != '.' && $entry != '..') { $k = str_replace(array('-', '_'), array(''), $entry); preg_match('/(d+)/', $k, $matches); $dir_list[$matches[0]] = $entry; } } $d->close(); } ksort($dir_list); return $dir_list; } public static function random($length, $numeric = 0) { $seed = base_convert(md5(microtime().$_SERVER['DOCUMENT_ROOT']), 16, $numeric ? 10 : 35); $seed = $numeric ? (str_replace('0', '', $seed).'012340567890') : ($seed.'zZ'.strtoupper($seed)); if($numeric) { $hash = ''; } else { $hash = chr(rand(1, 26) + rand(0, 1) * 32 + 64); $length--; } $max = strlen($seed) - 1; for($i = 0; $i < $length; $i++) { $hash .= $seed{mt_rand(0, $max)}; } return $hash; } public static function delDir($dir) { if(strtoupper(substr(PHP_OS, 0, 3)) == 'WIN') { $str = "rmdir /s/q " . $dir; } else { $str = "rm -Rf " . $dir; } exec($str, $result, $return_var); return true; } /** * @see pcntl_waitpid() */ public static function waitpid($pid, &$status=0, $flag = WNOHANG) { return pcntl_waitpid($pid, $status, $flag); } /** * @see posix_kill() */ public static function kill($pid) { return posix_kill($pid, SIGTERM); } public static function lockFile($file_path='/tmp/file_lock.lock') { $fp = fopen($file_path, "w+"); if(flock($fp, LOCK_EX)) { fwrite($fp, date('Y-m-d H:i:s', time())."---pid[".posix_getpid()."]---lock suc! "); return $fp; } else {
fclose($fp); return false; } } public static function freeLock($fp) { fflush($fp); flock($fp, LOCK_UN); fclose($fp); } }