Swoole的process通信的方式
管道pipe
管道用于进程之间的数据交互,Linux系统本身提供了pipe
函数用于创建一个半双工通信管道。半双工的通信方式中数据只能单向流动(一端只读一端只写),只能在具有亲缘关系(父子进程)的进程之间使用。
管道是进程间通信IPC
中最基础的方式,管道有两种类型分别是命名管道、匿名管道。
- 匿名管道:专门用于具有血缘关系的进程之间,完成数据传递。
- 命名管道:可以用在任何两个进程之间,Swoole中的管道都是匿名管道。
在Swoole中利用eventfd
和UnixSock
封装了两种管道,使得进程之间的通信更加灵活。
Swoole的Process模块内置了管道的方式用于进程间通信,在构建Process实例时只要开启了$pipe_type
选项,Swoole底层会自动创建一个管道,这里需要说明的时,虽然名字上叫做管道,但实际上在新版Swoole中底层通信是通过UnixSock实现的,所以并不是真正意义上的Linux Pipe。
创建进程
swoole_process::__construct( callable $function, bool $redirect_stdin_stdout = false, int $pipe_type = SOCK_DGRAM, bool $enable_coroutine = false );
管道类型$pipe_type
可分为三种:
0
表示不创建管道1
表示创建SOCK_STREAM
类型的管道2
表示创建SOCK_DGRAM
类型的管道
当启用$redirect_stdin_stdout
后,$pipe_type
选项将忽略用户参数,强制为1。
管道描述符
当进程被fork
出来后,父进程和子进程中的Process对象会被设置上一个名为pipe
的成员变量,存放着底层UnixSocket的描述符,父进程和子进程可以通过这个管道描述符来发送数据,也可以直接调用Process提供的read/write
接口来收发数据。
object(SwooleProcess)#1 (6) { ["pipe"]=>int(4) ["callback"]=>NULL ["msgQueueId"]=>NULL ["msgQueueKey"]=>NULL ["pid"]=>int(287) ["id"]=>NULL }
创建进程时若指定参数$pipe_type
为非0的数值则表示使用管道的方式创建进行,创建成功后子进程会生成一个管道编号pipe
。
每创建一个进程就会随着创建一个管道,主进程若要和目标进程通信,可以向目标进程的管道写入或读取数据。
管道读写
swoole_process->write(string $data)
向进程的管道中写入数据swoole_process->read(int $buffer_size = 8192)
从进程的管道中读取数据
案例:关闭重定向子进程的标准输入输出
$ vim test.php
首先将所有子进程句柄保存到主进程的一个数组中,数组的下标为PID。当主进程向要和某个子进程通讯时,使用子进程的句柄向对应的管道中读写数据,即可实现进程间的管道通信。
<?php //进程创建成功后回调处理 function handle(swoole_process $worker){ //从进程管道中读取数据 $data = $worker->read(); echo PHP_EOL."from master: {$data}"; //向进程管道中写入数据 $pipe = $worker->pipe;//子进程的管道编号 $pid = $worker->pid;//子进程的PID $worker->write("hello master, this pipe is {$pipe}, pid is {$pid}"); sleep(2); $worker->exit(0); } //进程数量 $worker_num = 2; //重定向输入输出 $redirect_stdin_stdout = false; //存放进程的数组 $workers = []; //循环创建多进程 for($i=0; $i<$worker_num; $i++){ //创建进程 $process = new swoole_process("handle", $redirect_stdin_stdout); //启动进程 $pid = $process->start(); //保存进程句柄 $workers[$pid] = $process; } //主进程 foreach($workers as $pid=>$process){ //子进程句柄向自己的管道中写入数据 $process->write("hello worker, this pid is {$pid}"); //子进程句柄从自己的管道中读取数据 $data = $process->read(); echo PHP_EOL."from worker: {$data}".PHP_EOL; }
$ php test.php from master: hello worker, this pid is 347 from worker: hello master, this pipe is 4, pid is 347 from master: hello worker, this pid is 348 from worker: hello master, this pipe is 6, pid is 348
案例:开启重定向子进程的标准输入输出
创建进程时设置$redirect_stdin_stdout
为true
启用后,在进程内使用echo
将不再打印到屏幕,而会写入到管道,读取键盘输入将变为从管道中读取数据,默认为阻塞读取。
<?php //进程创建成功后回调处理 function handle(swoole_process $worker){ //从进程管道中读取数据 $data = $worker->read(); echo PHP_EOL."from master: {$data}"; //向进程管道中写入数据 $pipe = $worker->pipe;//子进程的管道编号 $pid = $worker->pid;//子进程的PID $worker->write("hello master, this pipe is {$pipe}, pid is {$pid}"); sleep(2); $worker->exit(0); } //进程数量 $worker_num = 2; //重定向输入输出 $redirect_stdin_stdout = true; //存放进程的数组 $workers = []; //循环创建多进程 for($i=0; $i<$worker_num; $i++){ //创建进程 $process = new swoole_process("handle", $redirect_stdin_stdout); //启动进程 $pid = $process->start(); //保存进程句柄 $workers[$pid] = $process; } //主进程 foreach($workers as $pid=>$process){ //子进程句柄向自己的管道中写入数据 $process->write("hello worker, this pid is {$pid}"); //子进程句柄从自己的管道中读取数据 $data = $process->read(); echo PHP_EOL."from worker: {$data}".PHP_EOL; }
$ php test.php from worker: from master: hello worker, this pid is 350 from worker: from master: hello worker, this pid is 351
消息队列message queue
Linux的消息队列是一系列保存在内核中的消息链表,消息队列中有一个msgKey
,可以通过它访问不同的消息队列。消息队列有数据大小限制,默认为8192,可以通过内核进行修改。
Swoole中使用消息队列
- 通信模式:默认采用争抢模式,无法将消息投递给指定的子进程。
- 新建消息队列后,主进程就可以使用。
- 消息队列不可以和管道一起使用,也无法使用
swoole event loop
。 - 主进程中要调用
wait()
函数,否则子进程中调用pop()
或push()
会报错。
Swoole消息队列函数
swoole_process->useQueue()
swoole_process->push(string $data)
swoole_process->pop(int $max_size = 8192)
案例
<?php //进程创建成功后回调处理 function handle(swoole_process $worker){ $recv = $worker->pop(); echo PHP_EOL."from master: {$recv}"; sleep(2); $worker->exit(0); } //进程数量 $worker_num = 2; //重定向输入输出 $redirect_stdin_stdout = false; //存放进程的数组 $workers = []; //循环创建多进程 for($i=0; $i<$worker_num; $i++){ //创建进程 $process = new swoole_process("handle", $redirect_stdin_stdout); //使用消息队列 $process->useQueue(); //启动进程 $pid = $process->start(); //保存进程句柄 $workers[$pid] = $process; } //主进程 foreach($workers as $pid=>$process){ $process->push("hello worker, this pid is {$pid}"); } for($i=0; $i<$worker_num; $i++){ $ret = swoole_process::wait(); $pid = $ret["pid"]; unset($workers[$pid]); echo PHP_EOL."worker pid is {$pid} exit"; }
$ php test.php from master: hello worker, this pid is 368 from master: hello worker, this pid is 369 worker pid is 368 exit worker pid is 369 exit