一:select 与epoll
select:监听并等待多个文件描述符的属性变化(可读、可写、错误异常),调用后select会阻塞,直到有描述符就绪,函数才返回,通过便利fdset,来找到就绪的描述符,并且最大描述符不能超过1024
epoll:相对select来说,更灵活,没有描述符限制,无需轮训,epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个时间表中(当连接有io流事件产生的时候,epoll就会告诉进程有io流事件产生,然后进程就去处理)
代码:(使用swoole中的event事件)
<?php class Worker { //监听socket protected $socket = NULL; //连接事件回调 public $onConnect = NULL; //接收消息事件回调 public $onMessage = NULL; public $workerNum = 10; public function __construct($socket_address) { $this->socket = stream_socket_server($socket_address); } //创建子进程 public function fork() { $this->accept(); } public function accept() { swoole_event_add($this->socket,function ($fd){ //服务端接收客户端请求 $clientSocket = stream_socket_accept($this->socket); if (!empty($clientSocket) && is_callable($this->onConnect)) { call_user_func($this->onConnect, $clientSocket); } swoole_event_add($clientSocket,function ($fd){ $buffer = fread($fd, 65535); //如果数据为空,或者为false,不是资源类型 if(empty($buffer)){ if(feof($fd) || !is_resource($fd)){ //触发关闭事件 fclose($fd); } } if (!empty($buffer) && is_callable($this->onMessage)) { call_user_func($this->onMessage, $fd, $buffer); } }); }); echo '非阻塞'; } public function start() { $this->fork(); } } $worker = new Worker('tcp://0.0.0.0:9801'); $worker->onConnect = function ($args) { echo "新的连接来了.{$args}.PHP_EOL"; }; $worker->onMessage = function ($conn, $message) { // var_dump($conn, $message); $content = "hello word qwe"; $http_resonse = "HTTP/1.1 200 OK "; $http_resonse .= "Content-Type: text/html;charset=UTF-8 "; $http_resonse .= "Connection: keep-alive "; $http_resonse .= "Server: php socket server "; $http_resonse .= "Content-length: " . strlen($content) . " "; $http_resonse .= $content; fwrite($conn, $http_resonse); }; $worker->start();
二:多进程master-worker模型
流程设计:
- master进程,负责处理配置文件读取,启动,终止和维护worker进程数,当worker进程推出后,会自动重新启动新的worker
- worker进程的主要任务就是完成具体的逻辑(监听端口、接受请求、使用epoll绝收请求,执行业务逻辑关闭连接)
代码:
<?php class Worker { //监听socket protected $socket = NULL; //连接事件回调 public $onConnect = NULL; //接收消息事件回调 public $onMessage = NULL; public $workerNum = 4; public $addr; public function __construct($socket_address) { $this->addr=$socket_address; } //创建子进程 public function fork() { for ($i = 0; $i < $this->workerNum; $i++) { $pid = pcntl_fork(); if ($pid < 0) { exit('创建失败'); } else if ($pid > 0) { } else { $this->accept(); exit(); } } $status = 0; //回收子进程 for ($i = 0; $i < $this->workerNum; $i++) { $pid = pcntl_wait($status); } } public function accept() { $opts = array( 'socket' => array( 'backlog' => '10240', ), ); $context = stream_context_create($opts); stream_context_set_option($context,'socket','so_reuseport',1); $this->socket = stream_socket_server($this->addr,$error,$errstr, STREAM_SERVER_BIND|STREAM_SERVER_LISTEN,$context); swoole_event_add($this->socket, function ($fd) { //服务端接收客户端请求 $clientSocket = stream_socket_accept($this->socket); if (!empty($clientSocket) && is_callable($this->onConnect)) { call_user_func($this->onConnect, $clientSocket); } swoole_event_add($clientSocket, function ($fd) { $buffer = fread($fd, 65535); //如果数据为空,或者为false,不是资源类型 if (empty($buffer)) { if (feof($fd) || !is_resource($fd)) { //触发关闭事件 fclose($fd); } } if (!empty($buffer) && is_callable($this->onMessage)) { call_user_func($this->onMessage, $fd, $buffer); } }); }); echo '非阻塞'; } public function start() { $this->fork(); } } $worker = new Worker('tcp://0.0.0.0:9801'); $worker->onConnect = function ($args) { echo "新的连接来了.{$args}.PHP_EOL"; }; $worker->onMessage = function ($conn, $message) { // var_dump($conn, $message); $content = "hello word qwe"; $http_resonse = "HTTP/1.1 200 OK "; $http_resonse .= "Content-Type: text/html;charset=UTF-8 "; $http_resonse .= "Connection: keep-alive "; $http_resonse .= "Server: php socket server "; $http_resonse .= "Content-length: " . strlen($content) . " "; $http_resonse .= $content; fwrite($conn, $http_resonse); }; $worker->start();