zoukankan      html  css  js  c++  java
  • Workerman学习笔记(二)Worker类

    本篇主要对Worker类进行中文注释方式学习Workerman

    启动文件:http_test.php

    <?php
    use WorkermanWorker;
    require_once __DIR__ . '/wk/Autoloader.php';
    
    // 创建一个Worker监听2345端口,使用http协议通讯
    $http_worker = new Worker("http://0.0.0.0:2345");
    
    // 启动4个进程对外提供服务
    $http_worker->count = 4;
    
    // 接收到浏览器发送的数据时回复hello world给浏览器
    $http_worker->onMessage = function($connection, $data)
    {
    // 向浏览器发送hello world
    $connection->send('hello world');
    };
    
    // 运行worker
    Worker::runAll();

    Worker类:

    初始化 __construct()
    /**
         * Construct.
         *
         * @param string $socket_name
         * @param array  $context_option
         */
        public function __construct($socket_name = '', array $context_option = array())
        {
            // Save all worker instances.
            //返回当前对象的hash id
            $this->workerId                    = spl_object_hash($this);
            //把当前对象加入到$__workers属性
            static::$_workers[$this->workerId] = $this;
            //设置PID数组,用于区分所在的对象
            static::$_pidMap[$this->workerId]  = array();
    
            // Get autoload root path.
            /*产生一条回溯跟踪
            Array
            (
                [0] => Array
                    (
                        [file] => /root/test_wk/http_test.php
                        [line] => 5
                        [function] => __construct
                        [class] => WorkermanWorker
                        [object] => WorkermanWorker Object
                            (
                                [id] => 0
                                [name] => none
                                [count] => 1
                                [user] =>
                                [group] =>
                                [reloadable] => 1
                                [reusePort] =>
                                [onWorkerStart] =>
                                [onConnect] =>
                                [onMessage] =>
                                [onClose] =>
                                [onError] =>
                                [onBufferFull] =>
                                [onBufferDrain] =>
                                [onWorkerStop] =>
                                [onWorkerReload] =>
                                [transport] => tcp
                                [connections] => Array
                                    (
                                    )
    
                                [protocol] =>
                                [_autoloadRootPath:protected] =>
                                [_pauseAccept:protected] => 1
                                [stopping] =>
                                [_mainSocket:protected] =>
                                [_socketName:protected] =>
                                [_localSocket:protected] =>
                                [_context:protected] =>
                                [workerId] => 0000000016887ec6000000004cc8efd4
                            )
    
                        [type] => ->
                        [args] => Array
                            (
                                [0] => http://0.0.0.0:2345
                            )
    
                    )
    
            )
            */
            $backtrace               = debug_backtrace();
            //设置自动加载根目录 本机:/root/test_wk
            $this->_autoloadRootPath = dirname($backtrace[0]['file']);
            Autoloader::setRootPath($this->_autoloadRootPath);
    
            // Context for socket.
            if ($socket_name) {
                $this->_socketName = $socket_name;
                //设置内核接受缓存器队列的大小
                if (!isset($context_option['socket']['backlog'])) {
                    $context_option['socket']['backlog'] = static::DEFAULT_BACKLOG;
                }
                //创建一个资源流,流的概念还在理解当中,等理解透彻了再补上^_^
                $this->_context = stream_context_create($context_option);
            }
    
            // Turn reusePort on.
            //开启reusePort,开启的目的是为了避免惊群效应,提升多进程短连接应用的性能
            //不了解的可以参考这篇文章,说的很清楚 https://www.jianshu.com/p/97cc8c52d47a
            if (static::$_OS === OS_TYPE_LINUX  // if linux
                && version_compare(PHP_VERSION,'7.0.0', 'ge') // if php >= 7.0.0
                && strtolower(php_uname('s')) !== 'darwin' // if not Mac OS
                && $this->transport !== 'unix') { // if not unix socket
    
                $this->reusePort = true;
            }
        }
    runAll() 初始化worker并运行环境服务
        /**
         * Run all worker instances.
         *
         * @return void
         */
        public static function runAll()
        {
            static::checkSapiEnv();//判断脚本启动方式
            static::init();//初始化
            static::lock();//锁定当前文件
            static::parseCommand();//cli终端命令解析
            static::daemonize();//以守护进程方式运行
            static::initWorkers();//初始化worker
            static::installSignal();//安装各类信号
            static::saveMasterPid();//将当前进程ID写入文件
            static::unlock();//释放当前文件锁
            static::displayUI();//启动后终端显示内容
            static::forkWorkers();//创建当前进程的子进程
            static::resetStd();//重定向输入和输出标准
            static::monitorWorkers();//监听子进程状态
        }
    checkSapiEnv()  判断脚本启动方式
    protected static function checkSapiEnv()
        {
            // Only for cli.
            if (PHP_SAPI !== 'cli') {
                exit("Only run in command line mode 
    ");
            }
            if (DIRECTORY_SEPARATOR === '\') {
                self::$_OS = OS_TYPE_WINDOWS;
            }
        }
    init()
    protected static function init()
        {
            //设置用户自定义的函数来处理脚本中出现的错误
            set_error_handler(function($code, $msg, $file, $line){
                Worker::safeEcho("$msg in file $file on line $line
    ");
            });
    
            // Start file.
            /*产生一条回溯跟踪,打印结果如下
                Array
                (
                    [0] => Array
                        (
                            [file] => /root/test_wk/Workerman/Worker.php
                            [line] => 537
                            [function] => init
                            [class] => WorkermanWorker
                            [type] => ::
                            [args] => Array
                                (
                                )
    
                        )
                    [1] => Array
                        (
                            [file] => /root/test_wk/http_test.php
                            [line] => 14
                            [function] => runAll
                            [class] => WorkermanWorker
                            [type] => ::
                            [args] => Array
                                (
                                )
    
                        )
                )
            */
            $backtrace        = debug_backtrace();
            //获取启动文件的绝对路径 本机:/root/test_wk/http_test.php
            static::$_startFile = $backtrace[count($backtrace) - 1]['file'];
    
            //启动文件/替换为_ 本机:_root_test_wk_http_test.php
            $unique_prefix = str_replace('/', '_', static::$_startFile);
    
            // Pid file.
            //设置PID路径,本机:/root/test_wk/Workerman/../_root_test_wk_http_test.php.pid
            if (empty(static::$pidFile)) {
                static::$pidFile = __DIR__ . "/../$unique_prefix.pid";
            }
    
            // Log file.
            //设置log路径并创建文件 本机:/root/test_wk/Workerman/../workerman.log
            if (empty(static::$logFile)) {
                static::$logFile = __DIR__ . '/../workerman.log';
            }
            $log_file = (string)static::$logFile;
            if (!is_file($log_file)) {
                touch($log_file);
                chmod($log_file, 0622);
            }
    
            // State.
            //设置默认状态值
            static::$_status = static::STATUS_STARTING;
    
            // For statistics.
            //获取服务启动时间
            static::$_globalStatistics['start_timestamp'] = time();
            //设置启动状态的文件 本机:/tmp/_root_test_wk_http_test.php.status
            static::$_statisticsFile                      = sys_get_temp_dir() . "/$unique_prefix.status";
    
            // Process title.
            //设置进程名称 本机:WorkerMan: master process  start_file=/root/test_wk/http_test.php
            static::setProcessTitle(static::$processTitle . ': master process  start_file=' . static::$_startFile);
    
            // Init data for worker id.
            static::initId();
    
            // Timer init.
            Timer::init();
        }
    setProcessTitle
    protected static function setProcessTitle($title)
        {
            //设置用户自定义的函数来处理脚本中出现的错误
            set_error_handler(function(){});
            // >=php 5.5
            if (function_exists('cli_set_process_title')) {
                //在cli模式下设置进程名称
                cli_set_process_title($title);
            } // Need proctitle when php<=5.5 .
            elseif (extension_loaded('proctitle') && function_exists('setproctitle')) {
                setproctitle($title);
            }
            restore_error_handler();
        }
    initId() 初始化进程数量
    protected static function initId()
        {
            /*
            初始化进程数量
            Array
            (
                [0] => 0
                [1] => 0
                [2] => 0
                [3] => 0
            )
            */
            foreach (static::$_workers as $worker_id => $worker) {
                $new_id_map = array();
                $worker->count = $worker->count < 1 ? 1 : $worker->count;
                for($key = 0; $key < $worker->count; $key++) {
                    $new_id_map[$key] = isset(static::$_idMap[$worker_id][$key]) ? static::$_idMap[$worker_id][$key] : 0;
                }
                static::$_idMap[$worker_id] = $new_id_map;
            }
        }
    lock();//锁定当前文件
    protected static function lock()
        {
            //用读方式打开当前启动文件
            $fd = fopen(static::$_startFile, 'r');
            //锁定当前文件
            if ($fd && !flock($fd, LOCK_EX)) {
                static::log('Workerman['.static::$_startFile.'] already running.');
                exit;
            }
        }
    parseCommand();//cli终端命令解析
    protected static function parseCommand()
        {
            //判断当前系统是否是linux,不是的话退出
            if (static::$_OS !== OS_TYPE_LINUX) {
                return;
            }
            //获取命令行参数
            global $argv;
            // Check argv;
            $start_file = $argv[0];//执行文件 本机:http_test.php
            $available_commands = array(
                'start',
                'stop',
                'restart',
                'reload',
                'status',
                'connections',
            );
            $usage = "Usage: php yourfile <command> [mode]
    Commands: 
    start		Start worker in DEBUG mode.
    		Use mode -d to start in DAEMON mode.
    stop		Stop worker.
    		Use mode -g to stop gracefully.
    restart		Restart workers.
    		Use mode -d to start in DAEMON mode.
    		Use mode -g to stop gracefully.
    reload		Reload codes.
    		Use mode -g to reload gracefully.
    status		Get worker status.
    		Use mode -d to show live status.
    connections	Get worker connections.
    ";
            //验证执行文件后跟的命令参数是否正确
            if (!isset($argv[1]) || !in_array($argv[1], $available_commands)) {
                if (isset($argv[1])) {
                    static::safeEcho('Unknown command: ' . $argv[1] . "
    ");
                }
                exit($usage);
            }
    
            // Get command.
            $command  = trim($argv[1]);
            $command2 = isset($argv[2]) ? $argv[2] : '';
    
            // Start command.
            //如果是守护进程方式启动,写入日志
            $mode = '';
            if ($command === 'start') {
                if ($command2 === '-d' || static::$daemonize) {
                    $mode = 'in DAEMON mode';
                } else {
                    $mode = 'in DEBUG mode';
                }
            }
            static::log("Workerman[$start_file] $command $mode");
    
            // Get master process PID.
            //获取当前主进程,第一次启动时获取不到
            $master_pid      = is_file(static::$pidFile) ? file_get_contents(static::$pidFile) : 0;
            //posix_kill($master_pid, 0)是检查进程是否存在
            //当前进程和主进程进行比对,如果一致则不能重新start,如果不一致只能使用start和restart命令
            $master_is_alive = $master_pid && posix_kill($master_pid, 0) && posix_getpid() !== $master_pid;
            // Master is still alive?
            if ($master_is_alive) {
                if ($command === 'start') {
                    static::log("Workerman[$start_file] already running");
                    exit;
                }
            } elseif ($command !== 'start' && $command !== 'restart') {
                static::log("Workerman[$start_file] not run");
                exit;
            }
    
            // execute command.
            switch ($command) {
                case 'start':
                    if ($command2 === '-d') {    //设置要使用守护进程启动
                        static::$daemonize = true;
                    }
                    break;
                case 'status':
                    while (1) {
                        if (is_file(static::$_statisticsFile)) {
                            @unlink(static::$_statisticsFile);
                        }
                        // Master process will send SIGUSR2 signal to all child processes.
                        posix_kill($master_pid, SIGUSR2);
                        // Sleep 1 second.
                        sleep(1);
                        // Clear terminal.
                        if ($command2 === '-d') {
                            static::safeEcho("33[H33[2J33(B33[m", true);
                        }
                        // Echo status data.
                        static::safeEcho(static::formatStatusData());
                        if ($command2 !== '-d') {
                            exit(0);
                        }
                        static::safeEcho("
    Press Ctrl+C to quit.
    
    ");
                    }
                    exit(0);
                case 'connections':
                    if (is_file(static::$_statisticsFile) && is_writable(static::$_statisticsFile)) {
                        unlink(static::$_statisticsFile);
                    }
                    // Master process will send SIGIO signal to all child processes.
                    posix_kill($master_pid, SIGIO);
                    // Waiting amoment.
                    usleep(500000);
                    // Display statisitcs data from a disk file.
                    if(is_readable(static::$_statisticsFile)) {
                        readfile(static::$_statisticsFile);
                    }
                    exit(0);
                case 'restart':
                case 'stop':
                    if ($command2 === '-g') {
                        static::$_gracefulStop = true;
                        $sig = SIGTERM;
                        static::log("Workerman[$start_file] is gracefully stopping ...");
                    } else {
                        static::$_gracefulStop = false;
                        $sig = SIGINT;
                        static::log("Workerman[$start_file] is stopping ...");
                    }
                    // Send stop signal to master process.
                    $master_pid && posix_kill($master_pid, $sig);
                    // Timeout.
                    $timeout    = 5;
                    $start_time = time();
                    // Check master process is still alive?
                    while (1) {
                        $master_is_alive = $master_pid && posix_kill($master_pid, 0);
                        if ($master_is_alive) {
                            // Timeout?
                            if (!static::$_gracefulStop && time() - $start_time >= $timeout) {
                                static::log("Workerman[$start_file] stop fail");
                                exit;
                            }
                            // Waiting amoment.
                            usleep(10000);
                            continue;
                        }
                        // Stop success.
                        static::log("Workerman[$start_file] stop success");
                        if ($command === 'stop') {
                            exit(0);
                        }
                        if ($command2 === '-d') {
                            static::$daemonize = true;
                        }
                        break;
                    }
                    break;
                case 'reload':
                    if($command2 === '-g'){
                        $sig = SIGQUIT;
                    }else{
                        $sig = SIGUSR1;
                    }
                    posix_kill($master_pid, $sig);
                    exit;
                default :
                    if (isset($command)) {
                        static::safeEcho('Unknown command: ' . $command . "
    ");
                    }
                    exit($usage);
            }
        }
    daemonize();//以守护进程方式运行
    protected static function daemonize()
        {
            //如果设置不是守护进程或不是linux系统,则退出
            if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) {
                return;
            }
            //umask的作用是在创建新文件或目录时 屏蔽掉新文件或目录不应有的访问允许权限
            //umask(0)是将默认权限掩码修改为0,意味着即将要创建的文件的权限都是777
            umask(0);
            //创建一个子进程
            $pid = pcntl_fork();
            //-1代表创建失败
            if (-1 === $pid) {
                throw new Exception('Fork fail');
            //子进程创建成功,当前进程退出
            //假如当前进程是1,子进程是2000,那么退出的是当前进程:1
            } elseif ($pid > 0) {
                exit(0);
            }
            //以下内容是从其他地方摘抄
            //创建一个会话领导者,彻底脱离控制终端,当前进程为会话首领进程  
            /**
            1、此进程【不可以是组长进程】将是新会话期的会话期首进程【session leader】,会话期首进程是创建该会话期的进程,此进程是新会话期的唯一进程。
            2、此进程将是新进程组的组长进程,新进程组的id是此调用进程的进程id. 
            3、此进程没有控制终端。  在调用setsid之前即使有控制终端,调用后它们的联系将解除
            4、一个会话是多个进程组的集合,并且只能有一个前台进程组。  
            一般来说先调用fork创建两个进程,让父进程退出,子进程来创建会话,因为子进程是继承了父进程的进程组id,其进程是新分配的,不会等于进程组id,保证了创建会话的不是进程组组长进程。
            **/
            if (-1 === posix_setsid()) {
                throw new Exception("Setsid fail");
            }
            // Fork again avoid SVR4 system regain the control of terminal.
            //此进程已经脱离了原来的会话终端,不在继承原来的会话了
            //再次创建一个子进程为孙子进程
            //该孙子进程负责继续运行后面的代码
            $pid = pcntl_fork();
            if (-1 === $pid) {
                throw new Exception("Fork fail");
            } elseif (0 !== $pid) {
                exit(0);//非孙子进程全部退出
            }
        }
    initWorkers();//初始化worker
    protected static function initWorkers()
        {
            //当前非linux系统退出
            if (static::$_OS !== OS_TYPE_LINUX) {
                return;
            }
            foreach (static::$_workers as $worker) {
                // Worker name.
                //设置worker实例的名字
                if (empty($worker->name)) {
                    $worker->name = 'none';
                }
    
                // Get unix user of the worker process.
                //获取当前用户名
                if (empty($worker->user)) {
                    $worker->user = static::getCurrentUser();
                } else {
                    if (posix_getuid() !== 0 && $worker->user !== static::getCurrentUser()) {
                        static::log('Warning: You must have the root privileges to change uid and gid.');
                    }
                }
    
                // Socket name.
                $worker->socket = $worker->getSocketName();
    
                // Status name.
                $worker->status = '<g> [OK] </g>';
    
                // Get column mapping for UI
                foreach(static::getUiColumns() as $column_name => $prop){
                    !isset($worker->{$prop}) && $worker->{$prop} = 'NNNN';
                    $prop_length = strlen($worker->{$prop});
                    $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
                    static::$$key = max(static::$$key, $prop_length);
                }
    
                // Listen.
                //reusePort属性已开启,不在当前进程统一监听,而是到各个子进程下进行监听
                if (!$worker->reusePort) {
                    $worker->listen();
                }
            }
        }
    protected static function getCurrentUser()
        {
            $user_info = posix_getpwuid(posix_getuid());
            return $user_info['name'];
        }
    installSignal();//安装各类信号
     protected static function installSignal()
        {
            if (static::$_OS !== OS_TYPE_LINUX) {
                return;
            }
            $signalHandler = 'WorkermanWorker::signalHandler';
            // stop
            pcntl_signal(SIGINT, $signalHandler, false);//前台进程结束,比如ctrl+c
            // graceful stop
            pcntl_signal(SIGTERM, $signalHandler, false);//停止进程
            // reload
            pcntl_signal(SIGUSR1, $signalHandler, false);//
            // graceful reload
            pcntl_signal(SIGQUIT, $signalHandler, false);
            // status
            pcntl_signal(SIGUSR2, $signalHandler, false);
            // connection status
            pcntl_signal(SIGIO, $signalHandler, false);
            // ignore
            pcntl_signal(SIGPIPE, SIG_IGN, false);
        }
    saveMasterPid();//将当前进程ID写入文件
    protected static function saveMasterPid()
        {
            //判断终端系统
            if (static::$_OS !== OS_TYPE_LINUX) {
                return;
            }
            //获取当前进程ID
            static::$_masterPid = posix_getpid();
            //将进程ID写入文件
            if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
                throw new Exception('can not save pid to ' . static::$pidFile);
            }
        }
    unlock();//释放当前文件锁
    protected static function unlock()
        {
            $fd = fopen(static::$_startFile, 'r');
            $fd && flock($fd, LOCK_UN);
        }
    forkWorkers();//创建当前进程的子进程
    protected static function forkWorkersForLinux()
        {
    
            foreach (static::$_workers as $worker) {
                //设置worker的名字和长度
                if (static::$_status === static::STATUS_STARTING) {
                    if (empty($worker->name)) {
                        $worker->name = $worker->getSocketName();
                    }
                    $worker_name_length = strlen($worker->name);
                    if (static::$_maxWorkerNameLength < $worker_name_length) {
                        static::$_maxWorkerNameLength = $worker_name_length;
                    }
                }
                //循环创建子进程
                while (count(static::$_pidMap[$worker->workerId]) < $worker->count) {
                    static::forkOneWorkerForLinux($worker);
                }
            }
        }
    protected static function forkOneWorkerForLinux(self $worker)
        {
            // Get available worker id.
            //获取子进程数组里的pid,没有创建时都是0
            //static::$_idMap[$worker_id]
            $id = static::getId($worker->workerId, 0);
            if ($id === false) {
                return;
            }
            //创建子进程
            $pid = pcntl_fork();
            // For master process.
            //创建子进程成功,在父进程中将子进程pid加入到_pidMap和_idMap
            if ($pid > 0) {
                static::$_pidMap[$worker->workerId][$pid] = $pid;
                static::$_idMap[$worker->workerId][$id]   = $pid;
            } // For child processes.
            //创建子进程成功,在子进程中...
            elseif (0 === $pid) {
                srand();
                mt_srand();
                //在子进程中创建socket监听,这是reusePort的优势
                if ($worker->reusePort) {
                    $worker->listen();
                }
                if (static::$_status === static::STATUS_STARTING) {
                    static::resetStd();//重定向输入和输出标准
                }
                static::$_pidMap  = array();
                // Remove other listener.
                //删除其他无用监听
                foreach(static::$_workers as $key => $one_worker) {
                    if ($one_worker->workerId !== $worker->workerId) {
                        $one_worker->unlisten();
                        unset(static::$_workers[$key]);
                    }
                }
                //停止闹钟信号
                Timer::delAll();
                //设置子进程名称
                static::setProcessTitle(self::$processTitle . ': worker process  ' . $worker->name . ' ' . $worker->getSocketName());
                //设置当前进程的UID和GID
                $worker->setUserAndGroup();
                $worker->id = $id;
                //停止运行,成为僵尸进程,否则会在while循环中一直运行
                //参考https://www.iminho.me/wiki/blog-15.html
                $worker->run();
                if (strpos(static::$eventLoopClass, 'WorkermanEventsSwoole') !== false) {
                    exit(0);
                }
                $err = new Exception('event-loop exited');
                static::log($err);
                exit(250);
            } else {
                throw new Exception("forkOneWorker fail");
            }
        }
    protected static function getId($worker_id, $pid)
        {
            return array_search($pid, static::$_idMap[$worker_id]);
        }
    listen()
    public function listen()
        {
            if (!$this->_socketName) {
                return;
            }
    
            // Autoload.
            Autoloader::setRootPath($this->_autoloadRootPath);
    
            if (!$this->_mainSocket) {
                //获取socket和address
                $local_socket = $this->parseSocketAddress();
    
                // Flag.
                $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
                $errno = 0;
                $errmsg = '';
                // SO_REUSEPORT.
                if ($this->reusePort) {
                    //设置流端口复用
                    stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
                }
    
                // Create an Internet or Unix domain server socket.
                //创建一个Internet或Unix域服务器套接字
                //使用stream_socket_accept()开始接受连接,这个在后面
                $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
                if (!$this->_mainSocket) {
                    throw new Exception($errmsg);
                }
                //暂时忽略设置ssl
                if ($this->transport === 'ssl') {
                    stream_socket_enable_crypto($this->_mainSocket, false);
                } elseif ($this->transport === 'unix') {
                    $socket_file = substr($local_socket, 7);
                    if ($this->user) {
                        chown($socket_file, $this->user);
                    }
                    if ($this->group) {
                        chgrp($socket_file, $this->group);
                    }
                }
    
                // Try to open keepalive for tcp and disable Nagle algorithm.
                if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
                    set_error_handler(function(){});
                    //是tcp传输层协议的话导入创建好的流构建socket
                    $socket = socket_import_stream($this->_mainSocket);
                    //设置tcp选项,选项还可以通过linux进行修改配置
                    //心跳设置
                    socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
                    socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
                    restore_error_handler();
                }
    
                // Non blocking.
                //设置为非阻塞模式
                stream_set_blocking($this->_mainSocket, false);
            }
    
            $this->resumeAccept();
        }
    protected function parseSocketAddress() {
            if (!$this->_socketName) {
                return;
            }
            // Get the application layer communication protocol and listening address.
            list($scheme, $address) = explode(':', $this->_socketName, 2);
            // Check application layer protocol class.
            //设置协议类
            if (!isset(static::$_builtinTransports[$scheme])) {
                $scheme         = ucfirst($scheme);
                $this->protocol = substr($scheme,0,1)==='\' ? $scheme : 'Protocols\' . $scheme;
                if (!class_exists($this->protocol)) {
                    $this->protocol = "Workerman\Protocols\$scheme";
                    if (!class_exists($this->protocol)) {
                        throw new Exception("class \Protocols\$scheme not exist");
                    }
                }
    
                if (!isset(static::$_builtinTransports[$this->transport])) {
                    throw new Exception('Bad worker->transport ' . var_export($this->transport, true));
                }
            } else {
                $this->transport = $scheme;
            }
            //local socket
            //返回socket和address
            return static::$_builtinTransports[$this->transport] . ":" . $address;
        }
    public function setUserAndGroup()
        {
            // Get uid.
            /*
            根据用户名获取用户信息
            当前用户名是root
            array(7) {
              ["name"]=>
              string(4) "root"
              ["passwd"]=>
              string(1) "x"
              ["uid"]=>
              int(0)
              ["gid"]=>
              int(0)
              ["gecos"]=>
              string(4) "root"
              ["dir"]=>
              string(5) "/root"
              ["shell"]=>
              string(9) "/bin/bash"
            }
            */
            $user_info = posix_getpwnam($this->user);
            if (!$user_info) {
                static::log("Warning: User {$this->user} not exsits");
                return;
            }
            $uid = $user_info['uid'];
            // Get gid.
            //获取用户组ID
            if ($this->group) {
                $group_info = posix_getgrnam($this->group);
                if (!$group_info) {
                    static::log("Warning: Group {$this->group} not exsits");
                    return;
                }
                $gid = $group_info['gid'];
            } else {
                $gid = $user_info['gid'];
            }
    
            // Set uid and gid.
            //设置当前进程的组ID和UID,并将用户名加入到组访问列表中
            if ($uid !== posix_getuid() || $gid !== posix_getgid()) {
                if (!posix_setgid($gid) || !posix_initgroups($user_info['name'], $gid) || !posix_setuid($uid)) {
                    static::log("Warning: change gid or uid fail.");
                }
            }
        }
    protected static function monitorWorkersForLinux()
        {
            static::$_status = static::STATUS_RUNNING;
            while (1) {
                // Calls signal handlers for pending signals.
                //调用等待信号的处理器
                //检查是否有未处理的信号
                pcntl_signal_dispatch();
                // Suspends execution of the current process until a child has exited, or until a signal is delivered
                $status = 0;
                //等待或返回fork的子进程状态
                $pid    = pcntl_wait($status, WUNTRACED);
                // Calls signal handlers for pending signals again.
                pcntl_signal_dispatch();
                // If a child has already exited.
                if ($pid > 0) {
                    // Find out which worker process exited.
                    foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
                        if (isset($worker_pid_array[$pid])) {
                            $worker = static::$_workers[$worker_id];
                            // Exit status.
                            if ($status !== 0) {
                                static::log("worker[" . $worker->name . ":$pid] exit with status $status");
                            }
    
                            // For Statistics.
                            if (!isset(static::$_globalStatistics['worker_exit_info'][$worker_id][$status])) {
                                static::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
                            }
                            ++static::$_globalStatistics['worker_exit_info'][$worker_id][$status];
    
                            // Clear process data.
                            unset(static::$_pidMap[$worker_id][$pid]);
    
                            // Mark id is available.
                            $id                              = static::getId($worker_id, $pid);
                            static::$_idMap[$worker_id][$id] = 0;
    
                            break;
                        }
                    }
                    // Is still running state then fork a new worker process.
                    if (static::$_status !== static::STATUS_SHUTDOWN) {
                        static::forkWorkers();
                        // If reloading continue.
                        if (isset(static::$_pidsToRestart[$pid])) {
                            unset(static::$_pidsToRestart[$pid]);
                            static::reload();
                        }
                    }
                }
    
                // If shutdown state and all child processes exited then master process exit.
                if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
                    static::exitAndClearAll();
                }
            }
        }
  • 相关阅读:
    你知道RAID的初始化过程吗?
    Dynamic Disk Pool技术解析
    Ubuntu-16.04.6 安装 oracle 11.2.0.4 数据库database软件
    Ubuntu-16.04.6 安装 oracle 12.2.0.1 数据库database软件
    IDEA 快捷键
    Redis 常用操作命令
    甘特图
    PERT图
    现金贷
    线上信贷提供解决方案
  • 原文地址:https://www.cnblogs.com/shier-dong/p/13445929.html
Copyright © 2011-2022 走看看