zoukankan      html  css  js  c++  java
  • Swoole从入门到入土(23)——多进程[进程池ProcessPool]

    Swoole提供的进程池为ProcessPool,基于 SwooleServer 的 Manager 管理进程模块实现。可管理多个工作进程。该模块的核心功能为进程管理,相比 Process 实现多进程,ProcessPool 更加简单,封装层次更高,开发者无需编写过多代码即可实现进程管理功能,配合 CoServer 可以创建纯协程风格的,能利用多核 CPU 的服务端程序。同样,我们同过一段代码引出本节的详细讨论:

    $workerNum = 10;
    $pool = new SwooleProcessPool($workerNum);
    
    $pool->on("WorkerStart", function ($pool, $workerId) {
        echo "Worker#{$workerId} is started
    ";
        $redis = new Redis();
        $redis->pconnect('127.0.0.1', 6379);
        $key = "key1";
        while (true) {
             $msg = $redis->brpop($key, 2);
             if ( $msg == null) continue;
             var_dump($msg);
         }
    });
    
    $pool->on("WorkerStop", function ($pool, $workerId) {
        echo "Worker#{$workerId} is stopped
    ";
    });
    
    $pool->start();

    现在让我们一起了解,这个类为我们提供了哪些成员。

    常量

    成员函数

    1) __construct():构造方法

    SwooleProcessPool::__construct(int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0, bool $enable_coroutine = false);

    $worker_num:工作进程的数量(当一个进程退出后,Pool会及时拉取另一个进程进行补充)

    $ipc_type:进程间通信的模式【默认为 0 表示不使用任何进程间通信特性】,这个参数需要注意如下事项:

                       - 设置为 0 时必须设置 onWorkerStart 回调,并且必须在 onWorkerStart 中实现循环逻辑,当 onWorkerStart 函数退出时工作进程会立即退出,之后会由 Manager 进程重新拉起进程;

                       - 设置为 SWOOLE_IPC_MSGQUEUE 表示使用系统消息队列通信,可设置 $msgqueue_key 指定消息队列的 KEY,未设置消息队列 KEY,将申请私有队列;

                       - 设置为 SWOOLE_IPC_SOCKET 表示使用 Socket 进行通信,需要使用 listen 方法指定监听的地址和端口;

                       - 设置为 SWOOLE_IPC_UNIXSOCK 表示使用 unixSocket 进行通信,协程模式下使用,强烈推荐用此种方式进程间通讯

                       - 使用非 0 设置时,必须设置 onMessage 回调,onWorkerStart 变更为可选。

    $msgqueue_key:消息列队的key

    $enable_coroutine:是否开启协程支持【使用协程后将无法设置 onMessage 回调】

    示例1:

    $pool = new SwooleProcessPool(1, SWOOLE_IPC_NONE, 0, true);
    
    $pool->on('workerStart', function (SwooleProcessPool $pool, int $workerId) {
        while (true) {
            Co::sleep(0.5);
            echo "hello world
    ";
        }
    });
    
    $pool->start();

    示例2:开启协程后 Swoole 会禁止设置 onMessage 事件回调,需要进程间通讯的话需要将第二个设置为 SWOOLE_IPC_UNIXSOCK 表示使用 unixSocket 进行通信,然后使用 $pool->getProcess()->exportSocket() 导出 CoroutineSocket 对象,实现 Worker 进程间通信。

    $pool = new SwooleProcessPool(2, SWOOLE_IPC_UNIXSOCK, 0, true);
    
    $pool->on('workerStart', function (SwooleProcessPool $pool, int $workerId) {
       $process = $pool->getProcess(0);
       $socket = $process->exportSocket();
       if ($workerId == 0) {
           echo $socket->recv();
           $socket->send("hello proc1
    ");
           echo "proc0 stop
    ";
       } else {
           $socket->send("hello proc0
    ");
           echo $socket->recv();
           echo "proc1 stop
    ";
           $pool->shutdown();
       }
    });
    
    $pool->start();

    2) set():设置参数

    SwooleProcessPool->set(array $settings)

    可以使用 enable_coroutine 来控制是否启用协程,和构造函数的第四个参数作用一致。

    SwooleProcessPool->set(['enable_coroutine' => true]);

    3) on():设置进程池回调函数

    SwooleProcessPool->on(string $event, callable $function);

    $event:指定事件

    $function:回调函数

    事件说明:

    - onWorkerStart:子进程启动,回调函数格式:

    function onWorkerStart(SwooleProcessPool $pool, int $workerId) {
        echo "Worker#{$workerId} is started
    ";
    }

    - onWorkerStop:子进程结束,回调函数同onWorkerStart。

    - onMessage:消息接收,收到外部投递的消息。 一次连接只能投递一次消息,类似于 PHP-FPM 的短连接机制。回调函数格式如下:

    function onMessage(SwooleProcessPool $pool, string $data) {
        var_dump($data);
    }

    4) listen():监听 SOCKET,必须在 $ipc_mode = SWOOLE_IPC_SOCKET 时才能使用。

    SwooleProcessPool->listen(string $host, int $port = 0, int $backlog = 2048): bool

    $host:监听的地址【支持 TCP 和 unixSocket 两种类型。127.0.0.1 表示监听 TCP 地址,需要指定 $port。unix:/tmp/php.sock 监听 unixSocket 地址。】

    $port:监听的端口【在 TCP 模式下需要指定。】

    $backlog:监听的队列长度

    返回值:成功监听返回 true、监听失败返回 false,可调用 swoole_errno 获取错误码。监听失败后,调用 start 时会立即返回 false。

    注意:向监听端口发送数据时,客户端必须在请求前增加 4 字节、网络字节序的长度值。协议格式为:packet = htonl(strlen(data)) + data;

    示例:

    $pool->listen('127.0.0.1', 8089);
    $pool->listen('unix:/tmp/php.sock');

    5) write():向对端写入数据,必须在 $ipc_mode 为 SWOOLE_IPC_SOCKET 时才能使用。

    SwooleProcessPool->write(string $data): bool

    $data:写入的数据内容【可多次调用 write,底层会在 onMessage 函数退出后将数据全部写入 socket 中,并 close 连接】

    说明:此方法为内存操作,没有 IO 消耗,发送数据操作是同步阻塞 IO

    示例:

    /**** 服务端 ****/
    $pool = new SwooleProcessPool(2, SWOOLE_IPC_SOCKET);
    
    $pool->on("Message", function ($pool, $message) {
        echo "Message: {$message}
    ";
        $pool->write("hello ");
        $pool->write("world ");
        $pool->write("
    ");
    });
    
    $pool->listen('127.0.0.1', 8089);
    $pool->start();
    
    
    /**** 客户端 ****/
    $fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr
    ");
    $msg = json_encode(['data' => 'hello', 'uid' => 1991]);
    fwrite($fp, pack('N', strlen($msg)) . $msg);
    sleep(1);
    //将显示 hello world
    
    $data = fread($fp, 8192);
    var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
    fclose($fp);

    6) start():启动工作进程

    SwooleProcessPool->start(): bool

    说明1:

    启动成功,当前进程进入 wait 状态,管理工作进程;

    启动失败,返回 false,可使用 swoole_errno 获取错误码。

    说明2:关于进程管理:

    - 某个工作进程遇到致命错误、主动退出时管理器会进行回收,避免出现僵尸进程
    - 工作进程退出后,管理器会自动拉起、创建一个新的工作进程
    - 主进程收到 SIGTERM 信号时将停止 fork 新进程,并 kill 所有正在运行的工作进程
    - 主进程收到 SIGUSR1 信号时将将逐个 kill 正在运行的工作进程,并重新启动新的工作进程

    说明3:信号处理:

    底层仅设置了主进程(管理进程)的信号处理,并未对 Worker 工作进程设置信号,需要开发者自行实现信号的监听。

    - 工作进程为异步模式,请使用 SwooleProcess::signal 监听信号
    - 工作进程为同步模式,请使用 pcntl_signal 和 pcntl_signal_dispatch 监听信号

    在工作进程中应当监听 SIGTERM 信号,当主进程需要终止该进程时,会向此进程发送 SIGTERM 信号。如果工作进程未监听 SIGTERM 信号,底层会强行终止当前进程,造成部分逻辑丢失。

    示例:

    $pool->on("WorkerStart", function ($pool, $workerId) {
        $running = true;
        pcntl_signal(SIGTERM, function () use (&$running) {
            $running = false;
        });
        echo "Worker#{$workerId} is started
    ";
        $redis = new Redis();
        $redis->pconnect('127.0.0.1', 6379);
        $key = "key1";
        while ($running) {
             $msg = $redis->brpop($key);
             pcntl_signal_dispatch();
             if ( $msg == null) continue;
             var_dump($msg);
         }
    });

    7)  shutdown():终止工作进程

    SwooleProcessPool->shutdown(): bool

    8)  getProcess():获取当前工作进程对象。返回 SwooleProcess 对象。

    SwooleProcessPool->getProcess(int $worker_id): SwooleProcess;

    $worker_id:指定获取 worker 【可选参数,默认当前 worker】

    注意:

    必须在 start 之后,在工作进程的 onWorkerStart 或其他回调函数中调用;

    返回的 Process 对象是单例模式,在工作进程中重复调用 getProcess() 将返回同一个对象。

    示例:

    $workerNum = 10;
    $pool = new SwooleProcessPool($workerNum);
    
    $pool->on("WorkerStart", function ($pool, $workerId) {
        $process = $pool->getProcess();
        $process->exec("/bin/sh", ["ls", '-l']);
    });
    
    $pool->on("WorkerStop", function ($pool, $workerId) {
        echo "Worker#{$workerId} is stopped
    ";
    });
    
    $pool->start();

    Swoole提供的进程池成员简练,使用方便。以上就是进程池的所有内容,下一节我们将讨论进程管理器Manager:)

    ---------------------------  我是可爱的分割线  ----------------------------

    最后博主借地宣传一下,漳州编程小组招新了,这是一个面向漳州青少年信息学/软件设计的学习小组,有意向的同学点击链接,联系我吧。

  • 相关阅读:
    分类和预测
    机器学习&数据挖掘笔记_16(常见面试之机器学习算法思想简单梳理)
    字符串匹配的KMP算法
    灰度共生矩阵提取纹理特征源码
    redis永不过期,保留最新5条数据,StringUtils.join()等总结
    Session问题以及解决方案
    spring boot 配置 log4j2
    每日知识记载总结54
    spring cloud踩坑指南
    每日知识记载总结53
  • 原文地址:https://www.cnblogs.com/ddcoder/p/14243351.html
Copyright © 2011-2022 走看看