zoukankan      html  css  js  c++  java
  • workman的学习总结

    我们知道php主要是用来做web应用的,而且平时使用的都是都是和其他的web服务器来结合使用,比如和apache,nginx和apache的时候,是作为apache的一个动态模块来加载,和nginx的时候主要是使用fpm的形式,现在其他的一些语言,比如python,nodejs,ruby,go都是单独作为http服务编程的,其实php也是可以的,php里面有一些扩展,我们平时做web应用的时候都是很少使用到的,比如fcntl,libevent,stream,posix等等,这些扩展主要是对unix的api的一些封装,使用它们其实和uinx的网络编程是一样的,只是更加的便捷和容易,还可以使用我们熟悉的php语言,而不是c。

    workman其实就是把php的网络模块做一个整合,为php用户提供一个更加便捷的方式来创建网络编程的条件,比如实现tcp,udp服务器,实现http的web服务器等等。

    主要的文件结构

    Worker.php 是workman的主要入口文件,这个定义了workman作为一个服务器的相关的操作,变量初始化,参数解析,进程管理,请求接受,事件注册处理等等。

    WebServer.php 继承了Worker.php主要还是使用Worker.php的逻辑,主要区别是使用Htpp的协议,以及实现对uri的简单路由分发。

    Events 是事件的相关类

    Lib 包含Timer.php 以及一些常数。

    Connection 是对连接的处理

    Protocols 是对协议的处理,主要是对Connection接收的数据的处理,比如http的时候添加header头等等。

    Worker.php的分析

    内部函数备注说明
    self::checkSapiEnv(); //监测环境是否正确,workman只能运行在cli模式下面
    self::init(); //环境初始化
    self::parseCommand(); //解析参数,主要对应用start,stop,restart,status, -d的判断
    self::daemonize(); //master进程编程daemo守护进程
    self::initWorkers(); //初始化work进程
    self::installSignal(); // 安装信号量
    self::saveMasterPid(); // 保存master的进程id,一文件的形式
    self::forkWorkers(); //fork生成子进程,在event上面注册请求到达的事件
    self::displayUI(); //显示ui,用来在控制台打印输出一些内容
    self::resetStd(); / /重新设置标准输出和输入
    self::monitorWorkers(); // 监控子进程

    selef::checkSapiEnv

    //根据php_sapi_name来做判断
    if (php_sapi_name() != "cli") {
                exit("only run in command line mode 
    ");
    }
    

    sapi接口是php为其他的应用提供的抽象层接口,不管是cli还是php-fpm,fastCgi都是调用这一层的接口。

    在php源代码中

    main/SAPI.h 定义了_sapi_module_struct,每一个sapi的应用都是要传递该结构startup用来设置使用php的相关参数。

    其中php_sapi_name();函数返回的就是char *name对应的值。

    struct _sapi_module_struct {
        char *name;
        char *pretty_name;
    
        int (*startup)(struct _sapi_module_struct *sapi_module);
        int (*shutdown)(struct _sapi_module_struct *sapi_module);
    
        int (*activate)(TSRMLS_D);
        int (*deactivate)(TSRMLS_D);
    
        int (*ub_write)(const char *str, unsigned int str_length TSRMLS_DC);
        void (*flush)(void *server_context);
        struct stat *(*get_stat)(TSRMLS_D);
        char *(*getenv)(char *name, size_t name_len TSRMLS_DC);
    
        void (*sapi_error)(int type, const char *error_msg, ...);
    ....
    };
    

    self::init();

    //更具debug_backtrace();返回最后一个调用该函数的文件,作为启动文件
    $backtrace        = debug_backtrace();
    self::$_startFile = $backtrace[count($backtrace) - 1]['file'];
    
    // 设置pid的文件路名
    if (empty(self::$pidFile)) {
      self::$pidFile = __DIR__ . "/../" . str_replace('/', '_', self::$_startFile) . ".pid";
    }
    
    // 设置workman的日志文件,并且创建该文件
    if (empty(self::$logFile)) {
      self::$logFile = __DIR__ . '/../workerman.log';
    }
    $log_file = (string)self::$logFile;
    touch($log_file);
    chmod($log_file, 0622);
    
    // 初始化运行状态
    self::$_status = self::STATUS_STARTING;
    
    // 为了统计运行状态
    self::$_globalStatistics['start_timestamp'] = time();
    self::$_statisticsFile                      = sys_get_temp_dir() . '/workerman.status';
    
    // 设置进程的标题
    self::setProcessTitle('WorkerMan: master process  start_file=' . self::$_startFile);
    
    // 初始化该进程的id,主要是初始化所有work进程的进程id为0
    self::initId();
    
    // Timer init.
    Timer::init();
    

    parseCommand

    这部分的逻辑比较简单,主要功能就是解析参数

    global $argv;
    // 监测参数,最起码要设置一个参数start,stop等
    $start_file = $argv[0];
    if (!isset($argv[1])) {
      exit("Usage: php yourfile.php {start|stop|restart|reload|status}
    ");
    }
    
    // 获取参数,第一个是服务器控制,第二个是是否是daemon进程
    $command  = trim($argv[1]);
    $command2 = isset($argv[2]) ? $argv[2] : '';
    
    // 启动的时候,判断是不是参数有-d的标志
    $mode = '';
    if ($command === 'start') {
      if ($command2 === '-d' || Worker::$daemonize) {
          $mode = 'in DAEMON mode';
      } else {
          $mode = 'in DEBUG mode';
      }
    }
    self::log("Workerman[$start_file] $command $mode");
    
    // 获取master进程ID,从本地文件里面获取
    $master_pid      = @file_get_contents(self::$pidFile);
    $master_is_alive = $master_pid && @posix_kill($master_pid, 0);
    //posix_kill发送0的信号,用来检测该进程是不是存在
    if ($master_is_alive) {
      if ($command === 'start' && posix_getpid() != $master_pid) {
          self::log("Workerman[$start_file] already running");
          exit;
      }
    } elseif ($command !== 'start' && $command !== 'restart') {
      self::log("Workerman[$start_file] not run");
      exit;
    }
    switch ($command) {
      case 'start':
          if ($command2 === '-d') {
              Worker::$daemonize = true;
          }
          break;
      case 'status':
          if (is_file(self::$_statisticsFile)) {
              @unlink(self::$_statisticsFile);
          }
          // 发送status的信号到所有子进程
          posix_kill($master_pid, SIGUSR2);
          // Waiting amoment.
          usleep(500000);
          // 显示数据收集的结果
          @readfile(self::$_statisticsFile);
          exit(0);
      case 'restart':
      case 'stop':
          self::log("Workerman[$start_file] is stoping ...");
          // Send stop signal to master process.
          $master_pid && posix_kill($master_pid, SIGINT);
          // Timeout.
          $timeout    = 5;
          $start_time = time();
          // Check master process is still alive?
          while (1) {
    $master_is_alive = $master_pid && posix_kill($master_pid, 0);
              if ($master_is_alive) {
                  // Timeout?
                  if (time() - $start_time >= $timeout) {
                      self::log("Workerman[$start_file] stop fail");
                      exit;
                  }
                  // Waiting amoment.
                  usleep(10000);
                  continue;
              }
              // Stop success.
              self::log("Workerman[$start_file] stop success");
              if ($command === 'stop') {
                  exit(0);
              }
              if ($command2 === '-d') {
                  Worker::$daemonize = true;
              }
              break;
          }
          break;
    case 'reload':
          posix_kill($master_pid, SIGUSR1);
          self::log("Workerman[$start_file] reload");
          exit;
      default :
          exit("Usage: php yourfile.php {start|stop|restart|reload|status}
    ");
    }
    

    daemonize

    进程变为守护进程,主要是通过两次fork子进程(第一次主要是设置sid和原来的终端分离,第二主要失去重新获取控制终端的能力),关闭驻进程的方式。

    if (!self::$daemonize) {
      return;
    }
    umask(0);
    $pid = pcntl_fork();
    if (-1 === $pid) {
      throw new Exception('fork fail');
    } elseif ($pid > 0) {
      exit(0);//退出主进程
    }
    //setsid()调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离 但是他还是可以申请重新打开一个控制终端,因为他是进程的组长
    if (-1 === posix_setsid()) {
      throw new Exception("setsid fail");
    }
    // 可以通过使进程不再成为会话组长来禁止进程重新打开控制终端
    $pid = pcntl_fork();
    if (-1 === $pid) {
      throw new Exception("fork fail");
    } elseif (0 !== $pid) {
      exit(0);
    }
    

    initWorkers

    初始化worker进程

    主要就是worker进程开始监听端口,

    listen函数的内容如下:

    $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
        if (!$this->_mainSocket) {
            throw new Exception($errmsg);
        }    
    // 保持长连接,防止延迟
       if (function_exists('socket_import_stream') && $this->transport === 'tcp') {
           $socket = socket_import_stream($this->_mainSocket);
           @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
           @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
       }    
    
    // 设置为非阻塞.
    stream_set_blocking($this->_mainSocket, 0);
    

    installSignal

    注册信号事件

    pcntl_signal(SIGINT, array('WorkermanWorker', 'signalHandler'), false);
    //reload 重新加载
    pcntl_signal(SIGUSR1, array('WorkermanWorker', 'signalHandler'), false);
    //打印当前子进程状态
    pcntl_signal(SIGUSR2, array('WorkermanWorker', 'signalHandler'), false);
    // 管道关闭
    pcntl_signal(SIGPIPE, SIG_IGN, false);
    

    saveMasterPid

    把当前的子进程保存在文件中

    self::$_masterPid = posix_getpid();
    if (false === @file_put_contents(self::$pidFile, self::$_masterPid)) {
      throw new Exception('can not save pid to ' . self::$pidFile);
    }
    

    forkWorkers

    主要是就是循环self::_workers,根据里面的count数量生成对应的子进程。

    主要使用的方法是:

    forkOneWorker

    ///获取空的pid的位置
    $id = self::getId($worker->workerId, 0);
    if ($id === false) {
      return;
    }
    //fork子进程
    $pid = pcntl_fork();
    // For mast
    if ($pid > 0) {
      self::$_pidMap[$worker->workerId][$pid] = $pid;
      self::$_idMap[$worker->workerId][$id]   = $pid;
    } // For child processes.
    elseif (0 === $pid) {
      if ($worker->reusePort) {
          $worker->listen();
      }
      if (self::$_status === self::STATUS_STARTING) {
     self::resetStd();
    }
    self::$_pidMap  = array();
    self::$_workers = array($worker->workerId => $worker);
    Timer::delAll();
    self::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
    $worker->setUserAndGroup();
    $worker->id = $id;
    //work开始运行,接收请求
    $worker->run();
    exit(250);
    } else {
    throw new Exception("forkOneWorker fail");
    }
    

    run函数

    //注册进程结束时候的函数
    register_shutdown_function(array("\Workerman\Worker", 'checkErrors'));
    Autoloader::setRootPath($this->_autoloadRootPath);
    
    // 创建event.根据eventLoopName来实例话event
    if (!self::$globalEvent) {
      self::log("create globalEvent");
      $eventLoopClass    = "\Workerman\Events\" . ucfirst(self::getEventLoopName());
      self::$globalEvent = new $eventLoopClass;
      // 注册一个监听事件,当socket的连接是可读的时候.
      if ($this->_socketName) {
          if ($this->transport !== 'udp') {
              self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
                  array($this, 'acceptConnection'));
          } else {
              self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
                  array($this, 'acceptUdpConnection'));
          }
      }
    }
    

    acceptConnection函数

    接收请求
    $new_socket = @stream_socket_accept($socket, 0, $remote_address);
    if (!$new_socket) {
      return;
    }
    
    // 实例化TcpConnection
    $connection                         = new TcpConnection($new_socket, $remote_address);
    $this->connections[$connection->id] = $connection;
    $connection->worker                 = $this;
    $connection->protocol               = $this->protocol;
    //connection的onMessage指向当前类的onMessage方法
    $connection->onMessage              = $this->onMessage;
    $connection->onClose                = $this->onClose;
    $connection->onError                = $this->onError;                                                                  
    $connection->onBufferDrain          = $this->onBufferDrain;
    $connection->onBufferFull           = $this->onBufferFull;
  • 相关阅读:
    浅谈莫比乌斯反演/杜教筛/狄利克雷卷积
    bzoj3944:Sum
    bzoj3994:[SDOI2015]约数个数和
    bzoj2820:YY的GCD
    bzoj5323:[Jxoi2018]游戏
    bzoj5324:[Jxoi2018]守卫
    斐波那契和矩阵快速幂
    CF1278C-Berry Jam-(前缀和)
    CF1278B-A and B-(简单数学)
    CF92B-Binary Number-(思维)
  • 原文地址:https://www.cnblogs.com/flzs/p/10594548.html
Copyright © 2011-2022 走看看