zoukankan      html  css  js  c++  java
  • PHP多进程系列笔记(五)

    前面几节都是讲解pcntl扩展实现的多进程程序。本节给大家介绍swoole扩展的swoole_process模块。

    swoole多进程

    swoole_process 是swoole提供的进程管理模块,用来替代PHP的pcntl扩展。

    首先,确保安装的swoole版本大于1.7.2:

    $ php --ri swoole
    
    swoole
    
    swoole support => enabled
    Version => 1.10.1
    

    注意:swoole_process在最新的1.8.0版本已经禁止在Web环境中使用了,所以也只能支持命令行。

    swoole提供的多进程扩展基本功能和pcntl提供的一样,但swoole更易简单上手,并且提供了:

    • 默认基于unixsock的进程间通信;
    • 支持消息队列作为进程间通信;
    • 基于signalfd和eventloop处理信号,几乎没有任何额外消耗;
    • 高精度微秒定时器;
    • 配合swoole_event模块,创建的PHP子进程可以异步的事件驱动模式

    swoole_process 模块提供的方法(Method)主要分为四部分:

    • 基础方法
    swoole_process::__construct
    swoole_process->start
    swoole_process->name
    swoole_process->exec
    swoole_process->close
    swoole_process->exit
    swoole_process::kill
    swoole_process::wait
    swoole_process::daemon
    swoole_process::setAffinity
    
    • 管道通信
    swoole_process->write
    swoole_process->read
    swoole_process->setTimeout
    swoole_process->setBlocking
    
    • 消息队列通信
    swoole_process->useQueue
    swoole_process->statQueue
    swoole_process->freeQueue
    swoole_process->push
    swoole_process->pop
    
    • 信号与定时器
    swoole_process::signal
    swoole_process::alarm
    

    基础应用

    本例实现的是tcp server,特性:

    • 多进程处理客户端连接
    • 子进程退出,Master进程会重新创建一个
    • 支持事件回调
    • 主进程退出,子进程在干完手头活后退出
    <?php 
    
    class TcpServer{
        const MAX_PROCESS = 3;//最大进程数
        private $pids = []; //存储子进程pid
        private $socket;
        private $mpid;
    
        public function run(){
            $process = new swoole_process(function(){
                $this->mpid = $id = getmypid();   
                echo time()." Master process, pid {$id}
    "; 
    
                //创建tcp server
                $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
                if(!$this->socket) exit("start server err: $errstr --- $errno");
    
                for($i=0; $i<self::MAX_PROCESS;$i++){
                    $this->start_worker_process();
                }
        
                echo "waiting client...
    ";
        
                //Master进程等待子进程退出,必须是死循环
                while(1){
                    foreach($this->pids as $k=>$pid){
                        if($pid){
                            $res = swoole_process::wait(false);
                            if ( $res ){
                                echo time()." Worker process $pid exit, will start new... 
    ";
                                $this->start_worker_process();
                                unset($this->pids[$k]);
                            }
                        }
                    }
                    sleep(1);//让出1s时间给CPU
                }
            }, false, false); //不启用管道通信
            swoole_process::daemon(); //守护进程
            $process->start();//注意:start之后的变量子进程里面是获取不到的
        }
    
        /**
         * 创建worker进程,接受客户端连接
         */
        private function start_worker_process(){
            $process = new swoole_process(function(swoole_process $worker){
                $this->acceptClient($worker);
            }, false, false);
            $pid = $process->start();
            $this->pids[] = $pid;
        }
    
        private function acceptClient(&$worker)
        {
            //子进程一直等待客户端连接,不能退出
            while(1){
                
                $conn = stream_socket_accept($this->socket, -1);
                if($this->onConnect) call_user_func($this->onConnect, $conn); //回调连接事件
    
                //开始循环读取消息
                $recv = ''; //实际收到消息
                $buffer = ''; //缓冲消息
                while(1){
                    $this->checkMpid($worker);
                    
                    $buffer = fread($conn, 20);
    
                    //没有收到正常消息
                    if($buffer === false || $buffer === ''){
                        if($this->onClose) call_user_func($this->onClose, $conn); //回调断开连接事件
                        break;//结束读取消息,等待下一个客户端连接
                    }
    
                    $pos = strpos($buffer, "
    "); //消息结束符
                    if($pos === false){
                        $recv .= $buffer;                            
                    }else{
                        $recv .= trim(substr($buffer, 0, $pos+1));
    
                        if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回调收到消息事件
    
                        //客户端强制关闭连接
                        if($recv == "quit"){
                            echo "client close conn
    ";
                            fclose($conn);
                            break;
                        }
    
                        $recv = ''; //清空消息,准备下一次接收
                    }
                }
            }
        }
    
        //检查主进程是否存在,若不存在子进程在干完手头活后退出
        public function checkMpid(&$worker){
            if(!swoole_process::kill($this->mpid,0)){
                $worker->exit();
                // 这句提示,实际是看不到的.需要写到日志中
                echo "Master process exited, I [{$worker['pid']}] also quit
    ";
            }
        }
    
        function __destruct() {
            @fclose($this->socket);
        }
    }
    
    $server =  new TcpServer();
    
    $server->onConnect = function($conn){
        echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "
    ";
        fwrite($conn,"conn success
    ");
    };
    
    $server->onMessage = function($conn,$msg){
        echo "onMessage --" . $msg . "
    ";
        fwrite($conn,"received ".$msg."
    ");
    };
    
    $server->onClose = function($conn){
        echo "onClose --" . stream_socket_get_name($conn,true) . "
    ";
        fwrite($conn,"onClose "."
    ");
    };
    
    $server->run();
    

    运行后可以使用telnet连接:

    telnet 127.0.0.1 9201
    

    由于设置了最大三个子进程,最多只能接受3个客户端连接。

    进程间通信

    前面讲解的例子里,主进程和子进程直接是没有直接的数据交互的。如果主进程需要得到的来自子进程的反馈,或者子进程接受来自主进程的数据,那么就需要进程间通信了。

    swoole内置了管道通信和消息队列通信。

    管道通信

    管道通信主要是数据传输:一个进程需要将数据发送给另外一个进程。

    这个swoole封装后,使用非常简单:

    <?php 
    
    $workers = [];
    
    for ($i=0; $i<3; $i++) {
        $process = new swoole_process(function(swoole_process $worker){
            //子进程逻辑
            $cmd = $worker->read();
    
            ob_start();
            passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
            $return = ob_get_clean() ? : ' ';
            $return = trim($return).". worker pid:".$worker->pid."
    ";
            
            // $worker->write($return);//写入数据到管道
            echo $return;//写入数据到管道。注意:子进程里echo也是写入到管道
        }, true); //第二个参数为true,启用管道通信
        $pid = $process->start();
        $workers[$pid] = $process;  
    }
    
    foreach($workers as $pid=>$worker){
        $worker->write('whoami'); //通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的
        $recv = $worker->read();//同步阻塞读取管道数据
        echo "recv result: $recv";
    }
    
    //回收子进程
    while(count($workers)){
        // echo time(). "
    ";
        foreach($workers as $pid=>$worker){
            $ret = swoole_process::wait(false);
            if($ret){
                echo "worker exit: $pid
    ";
                unset($workers[$pid]);
            }
        }
    }
    
    
    

    运行:

    $ php swoole_process_pipe.php 
    recv result: Linux
    recv result: 2018年 06月 24日 星期日 16:18:01 CST
    recv result: yjc
    worker exit: 14519
    worker exit: 14522
    worker exit: 14525
    

    注意点:
    1、管道数据读取是同步阻塞的;上面的例子里如果子进程里再加一句$worker->read(),会一直阻塞。可以使用swoole_event_add将管道加入到事件循环中,变为异步模式。
    2、子进程里的输出(例如echo)与write效果相同。
    3、通过管道发数据到子进程。管道是单向的:发出的数据必须由另一端读取。不能读取自己发出去的。

    这里额外讲解一下swoole_process::wait()
    1、swoole_process::wait()默认是阻塞的, swoole_process::wait(false)则是非阻塞的;
    2、swoole_process::wait()阻塞模式调用一次仅能回收一个子进程,非阻塞模式调用一次不一定能当前就能回收子进程;
    3、如果不加swoole_process::wait(),主进程又是死循环,主进程退出后会变成僵尸进程。

    ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'可以查询僵尸进程。


    防盗版声明:本文系原创文章,发布于公众号飞鸿影的博客(fhyblog)及博客园,转载需作者同意。


    消息队列通信

    消息队列与管道有些不一样:消息队列是全局的,所有进程都可以发送、读取。你可以把它看做redis list结构。

    消息队列更常见的用途是主进程分配任务,子进程消费执行。

    <?php 
    
    $workers = [];
    
    for ($i=0; $i<3; $i++) {
        $process = new swoole_process(function(swoole_process $worker){
            //子进程逻辑
            sleep(1); //防止父进程还未往消息队列中加入内容直接退出
            while($cmd = $worker->pop()){
                // echo "recv from master: $cmd
    ";
    
                ob_start();
                passthru($cmd);//执行外部程序并且显示未经处理的、原始输出,会直接打印输出。
                $return = ob_get_clean() ? : ' ';
                $return = "res: ".trim($return).". worker pid: ".$worker->pid."
    ";
                
                echo $return;
                // sleep(1);
            }
    
            $worker->exit(0);
        }, false, false); //不创建管道
    
        $process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); //使用消息队列
        $pid = $process->start();
        $workers[$pid] = $process;
    }
    
    //由于所有进程是共享使用一个消息队列,所以只需向一个子进程发送消息即可
    $worker = current($workers);
    for ($i=0; $i<3; $i++) {
        $worker->push('whoami'); //发送消息
    }
    
    
    //回收子进程
    while(count($workers)){
        foreach($workers as $pid=>$worker){
            $ret = swoole_process::wait();
            if($ret){
                echo "worker exit: $pid
    ";
                unset($workers[$pid]);
            }
        }
    }
    

    运行结果:

    $ php swoole_process_quene.php 
    res: yjc. worker pid: 15885
    res: yjc. worker pid: 15886
    res: yjc. worker pid: 15887
    worker exit: 15885
    worker exit: 15886
    worker exit: 15887
    

    注意点:
    1、所有进程共享使用一个消息队列;
    2、消息队列的读取操作是阻塞的,可以在useQueue的时候第2个参数mode改为2 | swoole_process::IPC_NOWAIT,则是异步的。mode仅仅设置为2是阻塞的,示例里去掉swoole_process::IPC_NOWAIT后读取消息的while会死循环。
    3、子进程前面加了个sleep(1);,这是为了防止父进程还未往消息队列中加入内容直接退出。
    4、子进程末尾也加了sleep,这是为了防止一个进程把所有消息都消费完了,实际应用需要去掉。

    信号与定时器

    swoole_process::alarm支持微秒定时器:

    <?php 
    
    function ev_timer(){
        static $i = 0;
        echo "#{$i}	alarm
    ";
        $i++;
        if ($i > 5) {
            //清除定时器
            swoole_process::alarm(-1);
    
            //退出进程
            swoole_process::kill(getmypid());
            
        }
    }
    
    //安装信号
    swoole_process::signal(SIGALRM, 'ev_timer');
    
    //触发定时器信号:单位为微秒。如果为负数表示清除定时器
    swoole_process::alarm(100 * 1000);//100ms
    
    echo getmypid()."
    "; //该句会顺序执行,后续无需使用while循环防止进程直接退出
    
    

    运行:

    $ php swoole_process_alarm.php 
    13660
    #0	alarm
    #1	alarm
    #2	alarm
    #3	alarm
    #4	alarm
    #5	alarm
    

    注:alarm不能和SwooleTimer同时使用。

    参考

    1、Process-Swoole-Swoole文档中心
    https://wiki.swoole.com/wiki/page/p-process.html

  • 相关阅读:
    关于TCP/IP协议栈
    关于java socket
    批处理的高吞吐率和高延迟的解释
    关于Xmanager使用问题的总结
    关于Storm Stream grouping
    django url 传递多个参数
    多线程 python threading 信号量/递归锁
    多线程 python threading 简单锁/互斥锁
    django 1.9 wsgi + nginx
    django models ForeignKey Many-to-ManyField 操作
  • 原文地址:https://www.cnblogs.com/52fhy/p/9227023.html
Copyright © 2011-2022 走看看