zoukankan      html  css  js  c++  java
  • PHP如何实现daemon守护进程和master-woker模式进程

    一、PHP多进程及其实现
    每个进程都有一个父进程,子进程退出,父进程能得到子进程退出的状态。每个进程都属于一个进程组,每个进程组都有一个进程组号,该号等于该进程组组长的PID。

    场景一:

    日常任务中,有时需要通过php脚本执行一些日志分析,队列处理等任务,当数据量比较大时,可以使用多进程来处理。

    场景二:

    如果一个任务被分解成多个进程执行,就会减少整体的耗时。比如有一个比较大的数据文件要处理,这个文件由很多行组成。如果单进程执行要处理的任务,量很大时要耗时比较久。这时可以考虑多进程。多进程处理分解任务,每个进程处理文件的一部分,这样需要均分割一下这个大文件成多个小文件(进程数和小文件的个数等同就可以)。

    PHP不存在多线程,只有多进程,PHP多进程需要pcntl,posix扩展支持,可以通过 php - m 查看,没安装的话需要重新编译php,加上参数--enable-pcntl,posix一般默认会有。

    创建子进程的函数fork

    pcntl_fork — 在当前进程当前位置产生分支(子进程)。fork是创建了一个子进程,父进程和子进程 都从fork的位置开始向下继续执行,不同的是父进程执行过程中,得到的fork返回值为子进程号,而子进程得到的是0。

    <?php
    $pid = pcntl_fork();//父进程和子进程都会执行下面代码
    if ($pid == -1) {
    //错误处理:创建子进程失败时返回-1.
    die('could not fork');
    } else if ($pid) {
    //父进程会得到子进程号,所以这里是父进程执行的逻辑
    pcntl_wait($status);
    file_put_contents('/tmp/swh.log', "父进程执行".PHP_EOL, FILE_APPEND);
    //等待子进程中断,防止子进程成为僵尸进程。
    } else {
    //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
    file_put_contents('/tmp/swh.log', "子进程执行".PHP_EOL, FILE_APPEND);
    }
    file_put_contents('/tmp/swh.log', posix_getpid().PHP_EOL, FILE_APPEND);
    运行程序,得出输出:

    父进程执行
    12714
    子进程执行
    12715
    几个重要的和多线程编程相关函数

    posix_kill($pid, SIGTERM); //用来发信号给进程进行操作

    posix_getpid() //获取当前进程的进程id

    pcntl_wait($status); //wait函数刮起当前进程的执行直到一个子进程退出或接收到一个信号要求中断当前进程或调用一个信号处理函数。
     

    二、PHP如何实现守护进程
    守护进程(Daemon)是一种运行在后台的常驻进程服务, 很常见,例如 PHP-FPM, NGINX,REDIS,都需要一个父进程来支持整个服务。

    守护进程独立于终端,并在后台周期性的执行任务或等待处理某些发生的事件。(Daemon独立于终端是为了避免进程在执行过程中的信息在终端上显示或者因收到终端上所产生的终端信息而中断。在Linux中从该控制终端开始运行的进程会依附于该控制终端,当控制终端被关闭时,相应的进程都会自动关闭,所以守护进程必须脱离控制终端)

    守护进程一般在系统启动时开始运行,除非强行终止,否则直到系统关机才随之一起停止运行;

    守护进程一般都以root用户权限运行,因为要使用某些特殊的端口或者资源;

    它们由init进程启动,并且没有控制终端,是一种执行日常事务的进程。所有的提供服务的进程基本上都是守护进程,通常也可以称为服务

    它不需要用户输入就能够运行且提供某种服务,不是对整个系统就是对某个用户程序服务。

    查看守护进程

    #ps -efj
    守护进程基本上都是以超级用户启动,所以UID为0

    没有控制终端,所以TTY为?

    2.1 借助 nohup 和 &  配合使用
    在命令后面加上 & 符号, 可以让启动的进程转到后台运行,而不占用控制台,控制台还可以再运行其他命令。

    <?php
    while(true){
    echo time().PHP_EOL;
    sleep(3);
    }
    用 & 方式来启动该进程

    [dev@hua1-dev ~]$ ps -ef | grep 'test'
    dev 31095 1 0 11:37 ? 00:00:00 php test.php
    dev 31112 31012 0 11:37 pts/2 00:00:00 grep --color=auto test
    [dev@hua1-dev ~]$
    我们发现在后台运行,不影响正常运行别的命令, 但是有一个问题,我们关闭终端或退出后,脚本也就结束了。

    在命令之前加上 nohup ,启动的进程将会忽略linux的挂起信号 (SIGHUP)。

    那什么情况下会触发linux下SIGHUP信号呢,SIGHUP会在以下3种情况下被发送给相应的进程:

    1、终端关闭时,该信号被发送到session首进程以及作为job提交的进程(即用 & 符号提交的进程)
    2、session首进程退出时,该信号被发送到该session中的前台进程组中的每一个进程
    3、若父进程退出导致进程组成为孤儿进程组,且该进程组中有进程处于停止状态(收到SIGSTOP或SIGTSTP信号),该信号会被发送到该进程组中的每一个进程。

    [dev@hua1-dev ~]$ nohup php test.php &
    [1] 31626
    [dev@hua1-dev ~]$ nohup: 忽略输入并把输出追加到"nohup.out"

    [1]+ Exit 1 nohup php test.php
    [dev@hua1-dev ~]$
    可以看出,echo输出被重新定义到nohup.out中,终端关闭后,该进程还是存在的。

    [dev@hua1-dev tmp]$ ps -ef | grep 'test'
    dev 31817 1 0 11:50 ? 00:00:00 php test.php
    dev 32090 31744 0 11:54 pts/2 00:00:00 grep --color=auto test
    当我们组合 nohup 和 & 两种方式时,启动的进程不会占用控制台,也不依赖控制台,控制台关闭之后进程被1号进程收养,成为孤儿进程,这就和守护进程的机制非常类似了。

    2.2 根据守护进程的规则和特点通过代码来实现
    守护进程最大的特点就是脱离了用户终端和会话。

    创建守护进程步骤

    (1)创建子进程,退出父进程

    为了脱离终端,需要退出父进程,使得子进程可以在后台执行。在Linux中父进程先于子进程退出会导致子进程变为孤儿进程,而每当系统发现一个孤儿进程,就由有1号进程(init)收养它,这样,原先的子进程就会变成了init进程的子进程。

    (2)在子进程中创建新的会话

    先介绍三个概念:

    进程组:是一个或多个进程的集合。进程组由进程组ID来唯一标识。除了进程号PID之外,进程组ID也是一个进程的必备属性。每个进程组都有一个组长进程,其组长进程的进程号等于进程组ID,且该进程组ID不会因组长进程的退出而受到影响。

    会话:多个进程组组成一个会话。

    控制终端:每个会话可能会拥有一个控制终端(可以理解成我们常见的黑窗口,命令行),建立与控制终端连接的会话首进程叫做控制进程

    每个进程属于一个进程组,进程组号(GID)就是进程组长的进程号(PID)。登录会话可以包含多个进程组。这些进程组共享一个控制终端。由于在调用fork函数时,子进程拷贝了父进程的会话期、进程组、控制终端等,虽然父进程退出了,但会话期、进程组、控制终端等并没有改变,为了使子进程不再受到它们的影响,需要调用setsid()来创建一个新的会话。

    setsid() :用于创建一个新的会话,并且让调用该函数的进程成为该会话组的组长。对于子进程来说其主要有三个作用:

    让子进程摆脱原会话的控制。

    让子进程摆脱原进程组的控制。

    让子进程摆脱原控制终端的控制。

    (3)改变当前目录为根目录

    使用fork创建的子进程继承了父进程的当前的工作目录。由于在进程运行中,当前目录所在的文件系统(如“/mnt/usb”)是不能卸载的,这对以后的使用会造成诸多的麻烦(比如系统由于某种原因要进入单用户模式)。因此,通常的做法是让根目录”/”作为守护进程的当前工作目录。这样就可以避免上述的问题。如有特殊的需求,也可以把当前工作目录换成其他的路径。改变工作目录的方法是使用chdir函数。

    (4)重设文件权限掩码

    文件权限掩码:是指屏蔽掉文件权限中的对应位。

    例如,有个文件权限掩码是050,它就屏蔽了文件组拥有者的可读与可执行权限(对应二进制为,rwx, 101)。由于fork函数创建的子进程继承了父进程的文件权限掩码,这就给子进程使用文件带来了诸多的麻烦。因此,把文件权限掩码设置为0(即,不屏蔽任何权限),可以增强该守护进程的灵活性。设置文件权限掩码的函数是umask。通常的使用方法为umask(0)。

    (5)再fork()一个子进程并终止父进程

    现在,进程已经成为无终端的会话组长,但它可以重新申请打开一个控制终端,所以需要再次fork()一个子进程,并终止父进程。打开一个控制终端的前提条件是该进程必须为会话组组长,而我们通过第二次fork,确保了第二次fork出来的子进程不会是会话组组长。

    (6)关闭文件描述符

    用fork创建的子进程也会从父进程那里继承一些已经打开了的文件。这些被打开的文件可能永远不会被守护进程读写,但它们一样消耗系统资源,而且可能导致所在的文件系统无法卸载。在使用setsid调用之后,守护进程已经与所属的控制终端失去了联系,因此从终端输入的字符不可能达到守护进程,守护进程中用常规方法(如printf)输出的字符也不可能在终端上显示出来。所以,文件描述符为0、1、2(即,标准输入、标准输出、标准错误输出)的三个文件已经失去了存在的价值,也应该关闭。

    (7)守护进程退出处理

    当用户需要外部停止守护进程时,通常使用kill命令停止该守护进程。所以,守护进程中需要编码来实现kill发出的signal信号处理,达到进程正常退出。

    具体实现代码:

    <?php
    class Daemon{

    private $info_dir="/tmp";
    private $pid_file="";
    private $terminate=false; //是否中断
    private $workers_count=0;
    private $gc_enabled=null;
    private $workers_max=8; //最多运行8个进程

    public function __construct($is_sington=false,$user='nobody',$output="/dev/null"){

    $this->is_sington=$is_sington; //是否单例运行,单例运行会在tmp目录下建立一个唯一的PID
    $this->user=$user;//设置运行的用户 默认情况下nobody
    $this->output=$output; //设置输出的地方
    $this->checkPcntl();
    }
    //检查环境是否支持pcntl支持
    public function checkPcntl(){
    if ( ! function_exists('pcntl_signal_dispatch')) {
    // PHP < 5.3 uses ticks to handle signals instead of pcntl_signal_dispatch
    // call sighandler only every 10 ticks
    declare(ticks = 10);
    }

    // Make sure PHP has support for pcntl
    if ( ! function_exists('pcntl_signal')) {
    $message = 'PHP does not appear to be compiled with the PCNTL extension. This is neccesary for daemonization';
    $this->_log($message);
    throw new Exception($message);
    }
    //信号处理
    pcntl_signal(SIGTERM, array(__CLASS__, "signalHandler"),false);
    pcntl_signal(SIGINT, array(__CLASS__, "signalHandler"),false);
    pcntl_signal(SIGQUIT, array(__CLASS__, "signalHandler"),false);

    // Enable PHP 5.3 garbage collection
    if (function_exists('gc_enable'))
    {
    gc_enable();
    $this->gc_enabled = gc_enabled();
    }
    }

    // daemon化程序
    public function daemonize(){

    global $stdin, $stdout, $stderr;
    global $argv;

    set_time_limit(0);

    // 只允许在cli下面运行
    if (php_sapi_name() != "cli"){
    die("only run in command line mode ");
    }

    // 只能单例运行
    if ($this->is_sington==true){

    $this->pid_file = $this->info_dir . "/" .__CLASS__ . "_" . substr(basename($argv[0]), 0, -4) . ".pid";
    $this->checkPidfile();
    }

    umask(0); //把文件掩码清0

    if (pcntl_fork() != 0){ //是父进程,父进程退出
    exit();
    }

    posix_setsid();//设置新会话组长,脱离终端

    if (pcntl_fork() != 0){ //是第一子进程,结束第一子进程
    exit();
    }

    chdir("/"); //改变工作目录

    $this->setUser($this->user) or die("cannot change owner");

    //关闭打开的文件描述符
    fclose(STDIN);
    fclose(STDOUT);
    fclose(STDERR);

    $stdin = fopen($this->output, 'r');
    $stdout = fopen($this->output, 'a');
    $stderr = fopen($this->output, 'a');

    if ($this->is_sington==true){
    $this->createPidfile();
    }

    }
    //--检测pid是否已经存在
    public function checkPidfile(){

    if (!file_exists($this->pid_file)){
    return true;
    }
    $pid = file_get_contents($this->pid_file);
    $pid = intval($pid);
    if ($pid > 0 && posix_kill($pid, 0)){
    $this->_log("the daemon process is already started");
    }
    else {
    $this->_log("the daemon proces end abnormally, please check pidfile " . $this->pid_file);
    }
    exit(1);

    }
    //----创建pid
    public function createPidfile(){

    if (!is_dir($this->info_dir)){
    mkdir($this->info_dir);
    }
    $fp = fopen($this->pid_file, 'w') or die("cannot create pid file");
    fwrite($fp, posix_getpid());
    fclose($fp);
    $this->_log("create pid file " . $this->pid_file);
    }

    //设置运行的用户
    public function setUser($name){

    $result = false;
    if (empty($name)){
    return true;
    }
    $user = posix_getpwnam($name);
    if ($user) {
    $uid = $user['uid'];
    $gid = $user['gid'];
    $result = posix_setuid($uid);
    posix_setgid($gid);
    }
    return $result;

    }
    //信号处理函数
    public function signalHandler($signo){

    switch($signo){

    //用户自定义信号
    case SIGUSR1: //busy
    if ($this->workers_count < $this->workers_max){
    $pid = pcntl_fork();
    if ($pid > 0){
    $this->workers_count ++;
    }
    }
    break;
    //子进程结束信号
    case SIGCHLD:
    while(($pid=pcntl_waitpid(-1, $status, WNOHANG)) > 0){
    $this->workers_count --;
    }
    break;
    //中断进程
    case SIGTERM:
    case SIGHUP:
    case SIGQUIT:

    $this->terminate = true;
    break;
    default:
    return false;
    }

    }
    /**
    *开始开启进程
    *$count 准备开启的进程数
    */
    public function start($count=1){

    $this->_log("daemon process is running now");
    pcntl_signal(SIGCHLD, array(__CLASS__, "signalHandler"),false); // if worker die, minus children num
    while (true) {
    if (function_exists('pcntl_signal_dispatch')){

    pcntl_signal_dispatch();
    }

    if ($this->terminate){
    break;
    }
    $pid=-1;
    if($this->workers_count<$count){

    $pid=pcntl_fork();
    }

    if($pid>0){

    $this->workers_count++;

    }elseif($pid==0){

    // 这个符号表示恢复系统对信号的默认处理
    pcntl_signal(SIGTERM, SIG_DFL);
    pcntl_signal(SIGCHLD, SIG_DFL);
    if(!empty($this->jobs)){
    while($this->jobs['runtime']){
    if(empty($this->jobs['argv'])){
    call_user_func($this->jobs['function'],$this->jobs['argv']);
    }else{
    call_user_func($this->jobs['function']);
    }
    $this->jobs['runtime']--;
    sleep(2);
    }
    exit();

    }
    return;

    }else{

    sleep(2);
    }


    }

    $this->mainQuit();
    exit(0);

    }

    //整个进程退出
    public function mainQuit(){

    if (file_exists($this->pid_file)){
    unlink($this->pid_file);
    $this->_log("delete pid file " . $this->pid_file);
    }
    $this->_log("daemon process exit now");
    posix_kill(0, SIGKILL);
    exit(0);
    }

    // 添加工作实例,目前只支持单个job工作
    public function setJobs($jobs=array()){

    if(!isset($jobs['argv'])||empty($jobs['argv'])){

    $jobs['argv']="";

    }
    if(!isset($jobs['runtime'])||empty($jobs['runtime'])){

    $jobs['runtime']=1;

    }

    if(!isset($jobs['function'])||empty($jobs['function'])){

    $this->log("你必须添加运行的函数!");
    }

    $this->jobs=$jobs;

    }
    //日志处理
    private function _log($message){
    printf("%s %d %d %s ", date("c"), posix_getpid(), posix_getppid(), $message);
    }

    }
    调用方法

    //调用方法1
    $daemon=new Daemon(true);
    $daemon->daemonize();
    $daemon->start(2);//开启2个子进程工作
    work();




    //调用方法2
    $daemon=new Daemon(true);
    $daemon->daemonize();
    $daemon->addJobs(array('function'=>'work','argv'=>'','runtime'=>1000));//function 要运行的函数,argv运行函数的参数,runtime运行的次数
    $daemon->start(2);//开启2个子进程工作

    //具体功能的实现
    function work(){
    echo "测试1";
    }
     特殊说明

    // 获取进程ID

    var_dump(posix_getpid());

    // 获取进程组ID

    var_dump(posix_getpgid(posix_getpid()));

    // 获取进程会话ID

    var_dump(posix_getsid(posix_getpid()));
    三者结果相同,说明了该进程即使进程组的组长,也是会话首领。

    三、如何实现基于master-woker模式的守护进程
    <?php

    class Worker{

    public static $count = 2;

    public static function runAll(){
    static::runMaster();
    static::moniProcess();
    }

    //开启主进程
    public static function runMaster(){
    //确保进程有最大操作权限
    umask(0);
    $pid = pcntl_fork();
    if($pid > 0){
    echo "主进程进程 $pid ";
    exit;
    }else if($pid == 0){
    if(-1 === posix_setsid()){
    throw new Exception("setsid fail");
    }

    for ($i=0; $i < self::$count; $i++) {
    static::runWorker();
    }

    @cli_set_process_title("master_process");
    }else{
    throw new Exception("创建主进程失败");
    }
    }

    //开启子进程
    public static function runWorker(){
    umask(0);
    $pid = pcntl_fork();
    if($pid > 0){
    // echo "创建子进程 $pid ";
    }else if($pid == 0){
    if(-1 === posix_setsid()){
    throw new Exception("setsid fail");
    }
    @cli_set_process_title("worker_process");
    while(1){
    sleep(1);
    }
    }else{
    throw new Exception("创建子进程失败");
    }
    }
    //监控worker进程
    public static function moniProcess(){
    while( $pid = pcntl_wait($status)){
    if($pid == -1){
    break;
    }else{
    static::runWorker();
    }
    }
    }
    }

    Worker::runAll();

  • 相关阅读:
    Python super() 函数
    Python中的多继承
    sub eax, _PAGESIZE; decrease by PAGESIZE test dword ptr [eax],eax ; probe page
    ubuntu中的samba配置
    linux 相关命令
    Program Size: Code=x RO-data=x RW-data=x ZI-data=x 的含义
    结构之法,算法之道:程序员面试、算法研究、编程艺术、红黑树、数据挖掘5大系列集锦
    C++ 构造函数和析构函数
    C++ 友元函数总结
    C++ 动态存储空间的分配和释放 new与malloc的区别
  • 原文地址:https://www.cnblogs.com/liliuguang/p/12619029.html
Copyright © 2011-2022 走看看