zoukankan      html  css  js  c++  java
  • php使用event pcntl posix streams实现一个多进程框架

    2021年12月4日15:43:14

    Libev和libuv libevent 3个主流的高性能io网络库来说,目前libuv 的影响力是多大的

    https://github.com/libevent/libevent            watch 480  star 8.2k                 更新中

    https://github.com/libuv/libuv                      watch 705 star 18k                   更新中

    https://github.com/enki/libev                       watch 71  star 1.2k                   6年未更新了

    其实已经有类似的组件wokerman,那么为什么还要在写一个,其实主要是通过c++的视角,去实现一遍php的 libevent+多进程

    1,创建守护进程

    创建守护进程步骤
    首先我们要了解一些基本概念:
    进程组 :
    每个进程也属于一个进程组
    每个进程主都有一个进程组号,该号等于该进程组组长的PID号 .
    一个进程只能为它自己或子进程设置进程组ID号
    会话期:
    会话期(session)是一个或多个进程组的集合。
    setsid()函数可以建立一个对话期:
     如果,调用setsid的进程不是一个进程组的组长,此函数创建一个新的会话期。
    (1)此进程变成该对话期的首进程
    (2)此进程变成一个新进程组的组长进程。
    (3)此进程没有控制终端,如果在调用setsid前,该进程有控制终端,那么与该终端的联系被解除。 如果该进程是一个进程组的组长,此函数返回错误。
    (4)为了保证这一点,我们先调用fork()然后exit(),此时只有子进程在运行
    现在我们来给出创建守护进程所需步骤:
    编写守护进程的一般步骤步骤:
    (1)在父进程中执行fork并exit推出;
    (2)在子进程中调用setsid函数创建新的会话;
    (3)在子进程中调用chdir函数,让根目录 ”/” 成为子进程的工作目录;
    (4)在子进程中调用umask函数,设置进程的umask为0;
    (5)在子进程中关闭任何不需要的文件描述符
    说明:
    1. 在后台运行。
    为避免挂起控制终端将Daemon放入后台执行。方法是在进程中调用fork使父进程终止,让Daemon在子进程中后台执行。
    if(pid=fork())
    exit(0);//是父进程,结束父进程,子进程继续
    2. 脱离控制终端,登录会话和进程组
    有必要先介绍一下Linux中的进程与控制终端,登录会话和进程组之间的关系:进程属于一个进程组,进程组号(GID)就是进程组长的进程号(PID)。登录会话可以包含多个进程组。这些进程组共享一个控制终端。这个控制终端通常是创建进程的登录终端。
    控制终端,登录会话和进程组通常是从父进程继承下来的。我们的目的就是要摆脱它们,使之不受它们的影响。方法是在第1点的基础上,调用setsid()使进程成为会话组长:
    setsid();
    说明:当进程是会话组长时setsid()调用失败。但第一点已经保证进程不是会话组长。setsid()调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离。由于会话过程对控制终端的独占性,进程同时与控制终端脱离。
    3. 禁止进程重新打开控制终端
    现在,进程已经成为无终端的会话组长。但它可以重新申请打开一个控制终端。可以通过使进程不再成为会话组长来禁止进程重新打开控制终端:
    if(pid=fork())
    exit(0);//结束第一子进程,第二子进程继续(第二子进程不再是会话组长)
    4. 关闭打开的文件描述符
    进程从创建它的父进程那里继承了打开的文件描述符。如不关闭,将会浪费系统资源,造成进程所在的文件系统无法卸下以及引起无法预料的错误。按如下方法关闭它们:
    for(i=0;i 关闭打开的文件描述符close(i);>
    5. 改变当前工作目录
    进程活动时,其工作目录所在的文件系统不能卸下。一般需要将工作目录改变到根目录。对于需要转储核心,写运行日志的进程将工作目录改变到特定目录如/tmpchdir("/")
    6. 重设文件创建掩模
    进程从创建它的父进程那里继承了文件创建掩模。它可能修改守护进程所创建的文件的存取位。为防止这一点,将文件创建掩模清除:umask(0);
    7. 处理SIGCHLD信号
    处理SIGCHLD信号并不是必须的。但对于某些进程,特别是服务器进程往往在请求到来时生成子进程处理请求。如果父进程不等待子进程结束,子进程将成为僵尸进程(zombie)从而占用系统资源。如果父进程等待子进程结束,将增加父进程的负担,影响服务器进程的并发性能。在Linux下可以简单地将SIGCHLD信号的操作设为SIG_IGN。
    signal(SIGCHLD,SIG_IGN);
    这样,内核在子进程结束时不会产生僵尸进程。这一点与BSD4不同,BSD4下必须显式等待子进程结束才能释放僵尸进程
    View Code

    2,进程信号量

    [root@localhost ~]# kill -l
     1) SIGHUP       2) SIGINT       3) SIGQUIT      4) SIGILL       5) SIGTRAP
     6) SIGABRT      7) SIGBUS       8) SIGFPE       9) SIGKILL     10) SIGUSR1
    11) SIGSEGV     12) SIGUSR2     13) SIGPIPE     14) SIGALRM     15) SIGTERM
    16) SIGSTKFLT   17) SIGCHLD     18) SIGCONT     19) SIGSTOP     20) SIGTSTP
    21) SIGTTIN     22) SIGTTOU     23) SIGURG      24) SIGXCPU     25) SIGXFSZ
    26) SIGVTALRM   27) SIGPROF     28) SIGWINCH    29) SIGIO       30) SIGPWR
    31) SIGSYS      34) SIGRTMIN    35) SIGRTMIN+1  36) SIGRTMIN+2  37) SIGRTMIN+3
    38) SIGRTMIN+4  39) SIGRTMIN+5  40) SIGRTMIN+6  41) SIGRTMIN+7  42) SIGRTMIN+8
    43) SIGRTMIN+9  44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
    48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
    53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9  56) SIGRTMAX-8  57) SIGRTMAX-7
    58) SIGRTMAX-6  59) SIGRTMAX-5  60) SIGRTMAX-4  61) SIGRTMAX-3  62) SIGRTMAX-2
    63) SIGRTMAX-1  64) SIGRTMAX
    View Code

    https://man7.org/linux/man-pages/man7/signal.7.html

    这个是所有的信号

     Signal        x86/ARM     Alpha/   MIPS   PARISC   Notes
                       most others   SPARC
           ─────────────────────────────────────────────────────────────────
           SIGHUP           1           1       1       1
           SIGINT           2           2       2       2
           SIGQUIT          3           3       3       3
           SIGILL           4           4       4       4
           SIGTRAP          5           5       5       5
           SIGABRT          6           6       6       6
           SIGIOT           6           6       6       6
           SIGBUS           7          10      10      10
           SIGEMT           -           7       7      -
           SIGFPE           8           8       8       8
           SIGKILL          9           9       9       9
           SIGUSR1         10          30      16      16
           SIGSEGV         11          11      11      11
           SIGUSR2         12          31      17      17
           SIGPIPE         13          13      13      13
           SIGALRM         14          14      14      14
           SIGTERM         15          15      15      15
           SIGSTKFLT       16          -       -        7
           SIGCHLD         17          20      18      18
           SIGCLD           -          -       18      -
           SIGCONT         18          19      25      26
           SIGSTOP         19          17      23      24
           SIGTSTP         20          18      24      25
           SIGTTIN         21          21      26      27
           SIGTTOU         22          22      27      28
           SIGURG          23          16      21      29
           SIGXCPU         24          24      30      12
           SIGXFSZ         25          25      31      30
           SIGVTALRM       26          26      28      20
           SIGPROF         27          27      29      21
           SIGWINCH        28          28      20      23
           SIGIO           29          23      22      22
           SIGPOLL                                            Same as SIGIO
           SIGPWR          30         29/-     19      19
           SIGINFO          -         29/-     -       -
           SIGLOST          -         -/29     -       -
           SIGSYS          31          12      12      31
           SIGUNUSED       31          -       -       31
    View Code

    https://www-uxsup.csx.cam.ac.uk/courses/moved.Building/signals.pdf

    A list of signals and what they mean

    •      These were all recorded from a Linux i386 system. Numbers may vary between platforms.

    •      Linux uses signals 34-64 for its real-time system which we are not interested in.

    •      “man 7 signal” gives the official manual page on signals.

    •      This is a fairly exhaustive list of signals. Only some of them will arise in the context of the make program.

    No.

    Short name

    What it means

    1

    SIGHUP

    If a process is being run from terminal and that terminal suddenly goes   away then the process receives this signal. “HUP” is short for “hang up” and refers to hanging up the telephone in the days of telephone modems.

    2

    SIGINT

    The process was “interrupted” . This happens when you press Control+C on the controlling terminal.

    3

    SIGQUIT

    4

    SIGILL

    Illegal instruction. The program contained some machine code the CPU can't understand.

    5

    SIGTRAP

    This signal is used mainly from within debuggers and program tracers.

    6

    SIGABRT

    The program called the abort() function. This is an emergency stop.

    7

    SIGBUS

    An attempt was made to access memory incorrectly. This can be caused by alignment errors in memory access etc.

    8

    SIGFPE

    floating point exception happened in the program.

    9

    SIGKILL

    The process was explicitly killed by somebody wielding the kill

    program.

    10

    SIGUSR1

    Left for the programmers to do whatever they want.

    11

    SIGSEGV

    An attempt was made to access memory not allocated to the process. This is often caused by reading off the end of arrays etc.

    12

    SIGUSR2

    Left for the programmers to do whatever they want.

    13

    SIGPIPE

    If a process is producing output that is being fed into another process that consume it via a pipe (“producer   |  consumer”) and the consumer  dies then the producer is sent this signal.

    14

    SIGALRM

    A process can request a “wake up call” from the operating system at some  time in the future by calling the alarm() function. When that time comes round the wake up call consists of this signal.

    15

    SIGTERM

    The process was explicitly killed by somebody wielding the kill

    program.

    16

    unused

    17

    SIGCHLD

    The process had previously created one or more child processes with the fork() function. One or more of these processes has since died.

    18

    SIGCONT

    (To be read in conjunction with SIGSTOP.)

    19

    SIGSTOP

    (To be read in conjunction with SIGCONT.)

    No.

    Short name

    What it means

    state is preserved ready for it to be restarted (by SIGCONT) but it doesn't get any more CPU cycles until then.

    20

    SIGTSTP

    Essentially the same as SIGSTOP. This is the signal sent when the user hits Control+Z on the terminal. (SIGTSTP is short for “terminal stop”) The       only difference between SIGTSTP and SIGSTOP is that pausing is

    only the default action for SIGTSTP but is the required action for              SIGSTOP. The process can opt to handle SIGTSTP differently but gets no choice regarding SIGSTOP.

    21

    SIGTTIN

    The operating system sends this signal to a backgrounded process when it   tries to read input from its terminal. The typical response is to pause (as per SIGSTOP and SIFTSTP) and wait for the SIGCONT that arrives when the  process is brought back to the foreground.

    22

    SIGTTOU

    The operating system sends this signal to a backgrounded process when it tries to write output to its terminal. The typical response is as per

    SIGTTIN.

    23

    SIGURG

    The operating system sends this signal to a process using a network connection when “urgent” out of band data is sent to it.

    24

    SIGXCPU

    The operating system sends this signal to a process that has exceeded its    CPU limit. You can cancel any CPU limit with the shell command             “ulimit  -t  unlimited” prior to running make though it is more     likely that something has gone wrong ifyou reach the CPU limit in make.

    25

    SIGXFSZ

    The operating system sends this signal to a process that has tried to create a file above the file size limit. You can cancel any file size limit with the        shell command “ulimit -f unlimited” prior to running make though it is      more likely that something has gone wrong ifyou reach the file size limit    in make.

    26

    SIGVTALRM

    This is very similar to SIGALRM, but while SIGALRM is sent after a        certain amount of real time has passed, SIGVTALRM is sent after a certain amount of time has been spent running the process.

    27

    SIGPROF

    This is also very similar to SIGALRM and SIGVTALRM, but while           SIGALRM is sent after a certain amount of real time has passed, SIGPROF is sent after a certain amount of time has been spent running the process      and running system code on behalf of the process.

    28

    SIGWINCH

    (Mostly unused these days.) A process used to be sent this signal when one of its windows was resized.

    29

    SIGIO

    (Also known as SIGPOLL.) A process can arrange to have this signal sent to it when there is some input ready for it to process or an output channel  has become ready for writing.

    30

    SIGPWR

    A signal sent to processes by a power management service to indicate that power has switched to a short term emergency power supply. The process (especially long-running daemons) may care to shut down cleanlt before   the emergency power fails.

    31

    SIGSYS

    Unused.

     另一个版本 https://www.cnblogs.com/xiao0913/p/9337258.html

    列表中,编号为1 ~ 31的信号为传统UNIX支持的信号,是不可靠信号(非实时的),编号为32 ~ 63的信号是后来扩充的,称做可靠信号(实时信号)。不可靠信号和可靠信号的区别在于前者不支持排队,可能会造成信号丢失,而后者不会。
    下面我们对编号小于SIGRTMIN的信号进行讨论。
    1) SIGHUP
    本信号在用户终端连接(正常或非正常)结束时发出, 通常是在终端的控制进程结束时, 通知同一session内的各个作业, 这时它们与控制终端不再关联。
    登录Linux时,系统会分配给登录用户一个终端(Session)。在这个终端运行的所有程序,包括前台进程组和后台进程组,一般都属于这个Session。当用户退出Linux登录时,前台进程组和后台有对终端输出的进程将会收到SIGHUP信号。这个信号的默认操作为终止进程,因此前台进 程组和后台有终端输出的进程就会中止。不过可以捕获这个信号,比如wget能捕获SIGHUP信号,并忽略它,这样就算退出了Linux登录,wget也 能继续下载。
    此外,对于与终端脱离关系的守护进程,这个信号用于通知它重新读取配置文件。
    2) SIGINT
    程序终止(interrupt)信号, 在用户键入INTR字符(通常是Ctrl-C)时发出,用于通知前台进程组终止进程。
    3) SIGQUIT
    和SIGINT类似, 但由QUIT字符(通常是Ctrl-\)来控制. 进程在因收到SIGQUIT退出时会产生core文件, 在这个意义上类似于一个程序错误信号。
    4) SIGILL
    执行了非法指令. 通常是因为可执行文件本身出现错误, 或者试图执行数据段. 堆栈溢出时也有可能产生这个信号。
    5) SIGTRAP
    由断点指令或其它trap指令产生. 由debugger使用。
    6) SIGABRT
    调用abort函数生成的信号。
    7) SIGBUS
    非法地址, 包括内存地址对齐(alignment)出错。比如访问一个四个字长的整数, 但其地址不是4的倍数。它与SIGSEGV的区别在于后者是由于对合法存储地址的非法访问触发的(如访问不属于自己存储空间或只读存储空间)。
    8) SIGFPE
    在发生致命的算术运算错误时发出. 不仅包括浮点运算错误, 还包括溢出及除数为0等其它所有的算术的错误。
    9) SIGKILL
    用来立即结束程序的运行. 本信号不能被阻塞、处理和忽略。如果管理员发现某个进程终止不了,可尝试发送这个信号。
    10) SIGUSR1
    留给用户使用
    11) SIGSEGV
    试图访问未分配给自己的内存, 或试图往没有写权限的内存地址写数据.
    12) SIGUSR2
    留给用户使用
    13) SIGPIPE
    管道破裂。这个信号通常在进程间通信产生,比如采用FIFO(管道)通信的两个进程,读管道没打开或者意外终止就往管道写,写进程会收到SIGPIPE信号。此外用Socket通信的两个进程,写进程在写Socket的时候,读进程已经终止。
    14) SIGALRM
    时钟定时信号, 计算的是实际的时间或时钟时间. alarm函数使用该信号.
    15) SIGTERM
    程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和处理。通常用来要求程序自己正常退出,shell命令kill缺省产生这个信号。如果进程终止不了,我们才会尝试SIGKILL。
    17) SIGCHLD
    子进程结束时, 父进程会收到这个信号。
    如果父进程没有处理这个信号,也没有等待(wait)子进程,子进程虽然终止,但是还会在内核进程表中占有表项,这时的子进程称为僵尸进程。这种情 况我们应该避免(父进程或者忽略SIGCHILD信号,或者捕捉它,或者wait它派生的子进程,或者父进程先终止,这时子进程的终止自动由init进程来接管)。
    18) SIGCONT
    让一个停止(stopped)的进程继续执行. 本信号不能被阻塞. 可以用一个handler来让程序在由stopped状态变为继续执行时完成特定的工作. 例如, 重新显示提示符...
    19) SIGSTOP
    停止(stopped)进程的执行. 注意它和terminate以及interrupt的区别:该进程还未结束, 只是暂停执行. 本信号不能被阻塞, 处理或忽略.
    20) SIGTSTP
    停止进程的运行, 但该信号可以被处理和忽略. 用户键入SUSP字符时(通常是Ctrl-Z)发出这个信号
    21) SIGTTIN
    当后台作业要从用户终端读数据时, 该作业中的所有进程会收到SIGTTIN信号. 缺省时这些进程会停止执行.
    22) SIGTTOU
    类似于SIGTTIN, 但在写终端(或修改终端模式)时收到.
    23) SIGURG
    有"紧急"数据或out-of-band数据到达socket时产生.
    24) SIGXCPU
    超过CPU时间资源限制. 这个限制可以由getrlimit/setrlimit来读取/改变。
    25) SIGXFSZ
    当进程企图扩大文件以至于超过文件大小资源限制。
    26) SIGVTALRM
    虚拟时钟信号. 类似于SIGALRM, 但是计算的是该进程占用的CPU时间.
    27) SIGPROF
    类似于SIGALRM/SIGVTALRM, 但包括该进程用的CPU时间以及系统调用的时间.
    28) SIGWINCH
    窗口大小改变时发出.
    29) SIGIO
    文件描述符准备就绪, 可以开始进行输入/输出操作.
    30) SIGPWR
    Power failure
    31) SIGSYS
    非法的系统调用。
    在以上列出的信号中,程序不可捕获、阻塞或忽略的信号有:SIGKILL,SIGSTOP
    不能恢复至默认动作的信号有:SIGILL,SIGTRAP
    默认会导致进程流产的信号有:SIGABRT,SIGBUS,SIGFPE,SIGILL,SIGIOT,SIGQUIT,SIGSEGV,SIGTRAP,SIGXCPU,SIGXFSZ
    默认会导致进程退出的信号有:SIGALRM,SIGHUP,SIGINT,SIGKILL,SIGPIPE,SIGPOLL,SIGPROF,SIGSYS,SIGTERM,SIGUSR1,SIGUSR2,SIGVTALRM
    默认会导致进程停止的信号有:SIGSTOP,SIGTSTP,SIGTTIN,SIGTTOU
    默认进程忽略的信号有:SIGCHLD,SIGPWR,SIGURG,SIGWINCH
    此外,SIGIO在SVR4是退出,在4.3BSD中是忽略;SIGCONT在进程挂起时是继续,否则是忽略,不能被阻塞
    
    1. 自定义信号最好从 SIGRTMIN 开始,但最好保留前几个。比如SIGRTMIN+102. 用kill(0,signo)发送自定义信号时,本进程和所有子进程(通过exec等方式启动的)都必须有对应的处理函数,否则所有进程都会退出
    View Code

     3,php Streams Socket的区别

    https://www.php.net/manual/en/intro.sockets.php

    https://www.php.net/manual/en/intro.stream.php

    Socket比较接近c的直接封装,可控制的细节更多,其他的基本没什么区别

    但是如果你要使用socket的话,建议使用Streams ,因为这个是属于php的核心代码

    也支持很多流的操作,更加方便

    4,工作模式选择

    ①主进程监听,数据读写交给worker 

    ②主进程只负责任务分发,由多个worker去监听

    第一种就是nginx 的模式,第二种就是workerman的模式,目前我选用的第一种,性能区别需要后期经过测试才知道

    5,libevent使用

    只使用streams实现单进程的 demo1

    <?php
    
    ini_set('display_errors', 'on');
    // Reporting all.
    error_reporting(E_ALL);
    // JIT is not stable, temporarily disabled.
    ini_set('pcre.jit', 0);
    
    $opts = array(
        'socket' => array(
            'backlog' => 102400, //成功建立socket连接的等待个数
        ),
    );
    $context = stream_context_create($opts);
    
    //开启多端口监听,并且实现负载均衡
    stream_context_set_option($context, 'socket', 'so_reuseport', 1);
    stream_context_set_option($context, 'socket', 'so_reuseaddr', 1);
    $socket_base = stream_socket_server('tcp://127.0.0.1:9800', $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
    
    $socket = socket_import_stream($socket_base);
    socket_set_option($socket, \SOL_SOCKET, \SO_KEEPALIVE, 1);
    socket_set_option($socket, \SOL_TCP, \TCP_NODELAY, 1);
    
    stream_set_blocking($socket_base, false);//非阻塞
    
    //stream_socket_accept 本身就是阻塞的
    $clientSocket = stream_socket_accept($socket_base, -1);
    
    while (is_resource($clientSocket) && !feof($clientSocket)) {
    
        stream_set_blocking($clientSocket, false);
    
        $buffer = fread($clientSocket, 1000);
    
            print_r($buffer);
            if (empty($buffer)) {
                if (is_resource($clientSocket) && feof($clientSocket)) {
                    //触发关闭事件
                        fclose($clientSocket);
                }
            }else{
                $http_resonse = "HTTP/1.1 200 OK\r\n";
                $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
                $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
                $http_resonse .= "Server: php socket server\r\n";
                $http_resonse .= "Content-length: " . strlen($buffer) . "\r\n\r\n";
                $http_resonse .= $buffer;
                fwrite($clientSocket, $http_resonse);
            }
    }
    
    fclose($socket_base);
    View Code

    libevent和streams复用io demo2

    <?php
    ini_set('display_errors', 'on');
    // Reporting all.
    error_reporting(E_ALL);
    // JIT is not stable, temporarily disabled.
    ini_set('pcre.jit', 0);
    
    $opts = array(
        'socket' => array(
            'backlog' => 102400, //成功建立socket连接的等待个数
        ),
    );
    $context = stream_context_create($opts);
    
    //开启多端口监听,并且实现负载均衡
    stream_context_set_option($context, 'socket', 'so_reuseport', 1);
    stream_context_set_option($context, 'socket', 'so_reuseaddr', 1);
    
    $socket_base = stream_socket_server('tcp://127.0.0.1:9800', $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
    
    $socket = socket_import_stream($socket_base);
    socket_set_option($socket, \SOL_SOCKET, \SO_KEEPALIVE, 1);
    socket_set_option($socket, \SOL_TCP, \TCP_NODELAY, 1);
    
    stream_set_blocking($socket_base, false);//非阻塞
    
    function rw($fd,$event){
        stream_set_blocking($fd, false);
        $buffer = fread($fd, 1000);
        print_r($buffer);
        if(empty($buffer)){
            // fclose($fd);
        }else{
            $http_resonse = "HTTP/1.1 200 OK\r\n";
                $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
                $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
                $http_resonse .= "Server: php socket \r\n";
                $http_resonse .= "Content-length: " . strlen($buffer) . "\r\n\r\n";
                $http_resonse .= $buffer;
                fwrite($fd, $http_resonse);
        }
    }
    
    function accept($fd,$event){
        //stream_socket_accept 本身就是阻塞的
        $clientSocket = stream_socket_accept($fd, 0);
        stream_set_blocking($clientSocket, false);
    
        $base = new EventBase();
        $event = new Event($base, $clientSocket, Event::READ | Event::WRITE | Event::PERSIST, 'rw');
        $event->add();
        $base->loop();
    }
    
    $base = new EventBase();
    $event = new Event($base, $socket_base, Event::READ | Event::PERSIST, 'accept');
    $event->add();
    
    $base->loop();
    
    fclose($base);
    View Code

    封装多进程 复用io,socket服务 demo3

    <?php
    
    class Worker
    {
        //监听socket
        protected $socket = NULL;
        //连接事件回调
        public $onConnect = NULL;
        public $reusePort = 1;
        //接收消息事件回调
        public $onMessage = NULL;
        public $workerNum = 10; //子进程个数
        public $allSocket; //存放所有socket
        public $addr;
        protected $worker_pid; //子进程pid
        protected $master_pid;//主进程id
    
        public function __construct($socket_address)
        {
            //监听地址+端口
            $this->addr = $socket_address;
            $this->master_pid = posix_getpid();
        }
    
        public function start()
        {
            //获取配置文件
            $this->fork($this->workerNum);
            $this->monitorWorkers(); //监视程序,捕获信号,监视worker进程
        }
    
    
        public function signalHandler($sigo)
        {
            switch ($sigo) {
                case SIGUSR1:
                    $this->reload();
                    echo "收到重启信号";
                    break;
            }
    
        }
    
        public function fork($worker_num)
        {
            for ($i = 1; $i < $worker_num; $i++) {
                $pid = pcntl_fork(); //创建成功会返回子进程id
                if ($pid < 0) {
                    exit('创建失败');
                } else if ($pid > 0) {
                    //父进程空间,返回子进程id
                    $this->worker_pid[] = $pid;
                } else { //返回为0子进程空间
                    $this->accept();//子进程负责接收客户端请求
                    exit;
                }
            }
            //放在父进程空间,结束的子进程信息,阻塞状态
    
        }
    
        /**
         * 捕获信号
         * 监视worker进程.拉起进程
         */
        public function monitorWorkers()
        {
            //注册信号事件回调,是不会自动执行的
            // reload
            pcntl_signal(SIGUSR1, array($this, 'signalHandler'), false); //重启woker进程信号
    
            while (1) {
                // 当发现信号队列,一旦发现有信号就会触发进程绑定事件回调
                \pcntl_signal_dispatch();
                $status = 0;
                $pid = pcntl_wait($status, \WUNTRACED); //当信号到达之后就会被中断
                \pcntl_signal_dispatch();
                //进程重启的过程当中会有新的信号过来,如果没有调用pcntl_signal_dispatch,信号不会被处理
    
            }
        }
    
        public function accept()
        {
            $opts = array(
                'socket' => array(
                    'backlog' => 10240, //成功建立socket连接的等待个数
                ),
            );
            $context = stream_context_create($opts);
            //开启多端口监听,并且实现负载均衡
            stream_context_set_option($context, 'socket', 'so_reuseport', 1);
            stream_context_set_option($context, 'socket', 'so_reuseaddr', 1);
            $this->socket = stream_socket_server($this->addr, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN, $context);
    
            stream_set_blocking($this->socket, false);//非阻塞
            $base = new EventBase();
            $ss = $this->socket;
            //监听服务端的socket
            $event = new  Event($base, $ss, Event::PERSIST | Event::READ | Event::WRITE, function ($ss) {
    
                $clientSocket = stream_socket_accept($ss);
                //触发事件的连接的回调
                if (!empty($clientSocket) && is_callable($this->onConnect)) {
                    call_user_func($this->onConnect, $clientSocket);
                }
                $base = new EventBase();
                //监听客户端socket
                $event = new  Event($base, $clientSocket, Event::PERSIST | Event::READ | Event::WRITE, function ($clientSocket) {
                    $buffer = fread($clientSocket, 65535);
                    if (empty($buffer)) {
                        if (!is_resource($clientSocket) || feof($clientSocket)) {
                            //触发关闭事件
                            fclose($clientSocket);
                        }
                    }
                    //正常读取到数据,触发消息接收事件,响应内容
                    if (!empty($buffer) && is_callable($this->onMessage)) {
                        call_user_func($this->onMessage, $clientSocket, $buffer);
                    }
                });
                $event->add(); //加入事件监听
                $base->loop();
            });
            $event->add(); //加入事件监听
            $base->loop(); //调度挂起事件监听
        }
    
    
        /**
         * 重启worker进程
         */
        public function reload()
        {
            foreach ($this->worker_pid as $index => $pid) {
                posix_kill($pid, SIGKILL); //结束进程
                var_dump("杀掉的子进程", $pid);
                unset($this->worker_pid[$index]);
                $this->fork(1); //重新拉起worker
            }
    
        }
    
        //捕获信号之后重启worker进程
    }
    
    $worker = new Worker('tcp://0.0.0.0:9800');
    
    //开启多进程的端口监听
    $worker->reusePort = true;
    
    //连接事件
    $worker->onConnect = function ($fd) {
        //echo '连接事件触发',(int)$fd,PHP_EOL;
    };
    
    $worker->onTask = function ($fd) {
        //echo '连接事件触发',(int)$fd,PHP_EOL;
    };
    
    //消息接收
    $worker->onMessage = function ($conn, $message) {
        //事件回调当中写业务逻辑
        //var_dump($conn,$message);
        $content = $message;
        $http_resonse = "HTTP/1.1 200 OK\r\n";
        $http_resonse .= "Content-Type: text/html;charset=UTF-8\r\n";
        $http_resonse .= "Connection: keep-alive\r\n"; //连接保持
        $http_resonse .= "Server: php socket server\r\n";
        $http_resonse .= "Content-length: " . strlen($content) . "\r\n\r\n";
        $http_resonse .= $content;
        fwrite($conn, $http_resonse);
    
        if (empty($message)) {
            if (!is_resource($conn) || feof($message)) {
                //触发关闭事件
                fclose($conn);
            }
        }
    };
    
    $worker->start(); //启动
    View Code

    参考文档:

    https://www.cnblogs.com/mickole/category/496206.html

    QQ一群 247823727
    QQ二群 166427999
    博客文件如果不能下载请进群下载
    如果公司项目有技术瓶颈问题,如有需要,技术服务QQ: 903464207
  • 相关阅读:
    计算机基础
    程序的控制结构
    day 04作业
    数据类型-数据类型
    第二次作业
    第二次博客
    第一篇博客
    原生js与ajax应用
    ajax原理及其优缺点
    jQuery动画
  • 原文地址:https://www.cnblogs.com/zx-admin/p/15642269.html
Copyright © 2011-2022 走看看