由来
环境:PHP7、Swoole、linux
对聊天室有点感兴趣,对于网络协议有一点一知半解,所以决定借助swoole实现个简单的聊天室,来简单剖析下原理,知道原理以后就可以考虑用其他语言或者自己造轮子写个,当然这是后话。
源码我放置github( https://github.com/WalkingSun/SwooleServer ),有兴趣可以借鉴借鉴。
系统设计
即时通讯的网络通信基于长连接,通信方式有TCP、UDP、socket、websocket等,本次实现是websocket,系统建立常驻内存的websocket服务,客户端即浏览器两者建立连接通信。通信过程如下:
关于客户端连接websocket服务,本文不做细述,websocket服务的建立借助swoole,需要在服务端的open、recieve、send、close建立回调处理,为了方便我将连接的客户端信息放入swoole_table(一个基于共享内存和锁实现的超高性能,并发数据结构)。
代码仅供参考:
websocket 服务类:
<?php /** * Created by PhpStorm. * User: WalkingSun * Date: 2018/10/28 * Time: 15:54 */ class WsServer { const host = '0.0.0.0'; const port = '9501'; public $swoole; public $config = ['gcSessionInterval' => 60000]; public $openCallback; //open回调 public $messageCallback; //message回调 public $runApp; //request回调 public $workStartCallback; //work回调 public $finishCallback; //finish回调 public $closeCallback; //close回调 public $taskCallback; //task回调 public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[]) { $host = $host?:self::host; $port = $port?:self::port; $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType); $this->webRoot = $swooleConfig['document_root']; if( !empty($this->config) ) $this->config = array_merge($this->config, $config); $this->swoole->set($swooleConfig); $this->swoole->on('open',[$this,'onOpen']); $this->swoole->on('message',[$this,'onMessage']); $this->swoole->on('request',[$this,'onRequest']); $this->swoole->on('WorkerStart',[$this,'onWorkerStart']); //增加work进程 $this->swoole->on('task',[$this,'onTask']); //增加task任务进程 $this->swoole->on('finish',[$this,'onFinish']); $this->swoole->on('close',[$this,'onClose']); } public function run(){ $this->swoole->start(); } /** * 当WebSocket客户端与服务器建立连接并完成握手后会回调此函数 * @param $serv swoole_websocket_server 服务对象 * @param $request swoole_http_server 服务对象 */ public function onOpen( swoole_websocket_server $serv, $request){ call_user_func_array( $this->openCallback, [ $serv, $request ] ); //定时器(异步执行) // if($request->fd == 1){ // swoole_timer_tick(2000,function($timer_id){ // echo time().PHP_EOL; // }); // } } /** *当服务器收到来自客户端的数据帧时会回调此函数。 * @param $server swoole_websocket_server 服务对象 * @param $frame swoole_websocket_frame对象,包含了客户端发来的数据帧信息 * $frame->fd,客户端的socket id,使用$server->push推送数据时需要用到 $frame->data,数据内容,可以是文本内容也可以是二进制数据,可以通过opcode的值来判断 $frame->opcode,WebSocket的OpCode类型,可以参考WebSocket协议标准文档 $frame->finish, 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送(底层已经实现了自动合并数据帧,现在不用担心接收到的数据帧不完整) */ public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){ call_user_func_array( $this->messageCallback, [ $serv, $frame ]); } /** * @param $serv swoole_websocket_server 服务对象 * @param $fd 连接的文件描述符 * @param $reactorId 来自那个reactor线程 * onClose回调函数如果发生了致命错误,会导致连接泄漏。通过netstat命令会看到大量CLOSE_WAIT状态的TCP连接 * 当服务器主动关闭连接时,底层会设置此参数为-1,可以通过判断$reactorId < 0来分辨关闭是由服务器端还是客户端发起的。 */ public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){ call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]); } /** * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务。当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。 * @param $serv swoole_websocket_server 服务对象 * @param $task_id int 任务id,由swoole扩展内自动生成,用于区分不同的任务。$task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同 * @param $src_worker_id int 来自于哪个worker进程 * @param $data mixed 任务的内容 */ public function onTask(swoole_server $serv, $task_id, $src_worker_id, $data){ call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]); // sleep(10); // onTask函数中 return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成。 // return "task {$src_worker_id}-{$task_id} success"; } /** * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程。 * @param $serv swoole_websocket_server 服务对象 * @param $task_id int 任务id * @param $data string task任务处理的结果内容 * task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish 执行onFinish逻辑的worker进程与下发task任务的worker进程是同一个进程 */ public function onFinish(swoole_server $serv, $task_id, $data){ call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]); // echo $data; // return $data; } public function onRequest( $request, $response ){ call_user_func_array( $this->runApp, [ $request, $response ]); } public function onWorkerStart( $server, $worker_id ){ call_user_func_array( $this->workStartCallback , [$server, $worker_id]); } } 起服务和回调设置: class SwooleController extends BasicController{ public $host; public $port; public $swoole_config=[]; public static $table; public function actionStart(){ $config = include __DIR__ . '/../config/console.php'; if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file']; if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file']; $this->swoole_config = array_merge( [ 'document_root' => $config['swoole']['document_root'], 'enable_static_handler' => true, // 'daemonize'=>1, 'worker_num'=>4, 'max_request'=>2000, // 'task_worker_num'=>100, //检查死链接 使用操作系统提供的keepalive机制来踢掉死链接 'open_tcp_keepalive'=>1, 'tcp_keepidle'=> 1*60, //连接在n秒内没有数据请求,将开始对此连接进行探测 'tcp_keepcount' => 3, //探测的次数,超过次数后将close此连接 'tcp_keepinterval' => 0.5*60, //探测的间隔时间,单位秒 //swoole实现的心跳机制,只要客户端超过一定时间没发送数据,不管这个连接是不是死链接,都会关闭这个连接 // 'heartbeat_check_interval' => 10*60, //每m秒侦测一次心跳 // 'heartbeat_idle_time' => 30*60, //一个TCP连接如果在n秒内未向服务器端发送数据,将会被切断 ],$this->swoole_config ); $this->host = $config['swoole']['host']; $this->port = $config['swoole']['port']; $swooleServer = new WsServer( $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config); //连接信息保存到swoole_table self::$table = new swoole_table(10); self::$table->column('username',SwooleTable::TYPE_STRING, 10); self::$table->column('avatar',SwooleTable::TYPE_STRING, 255); self::$table->column('msg',SwooleTable::TYPE_STRING, 255); self::$table->column('fd',SwooleTable::TYPE_INT, 6); self::$table->create(); $swooleServer->openCallback = function( $server , $request ){ echo "server handshake with fd={$request->fd} "; }; $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){ //全局变量设置及app.log $this->globalParam( $request ); $_SERVER['SERVER_SWOOLE'] = $swooleServer; //记录日志 $apiData = $_SERVER; unset($apiData['SERVER_SWOOLE']); Common::addLog( $config['log'] , ($apiData) ); //解析路由 $r = $_GET['r']; $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index'); $params = explode('/',$r); $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php'; $result = ''; if( file_exists( $controller ) ){ require_once $controller; $class = new ReflectionClass(ucfirst($params[0]).'Controller'); if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){ $instance = $class->newInstanceArgs(); $method = $class->getmethod('action'.ucfirst($params[1])); // 获取类中方法 ob_start(); $method->invoke($instance); // 执行方法 $result = ob_get_contents(); ob_clean(); }else{ $result = 'NOT FOUND!'; } }else{ $result = "$controller not exist!"; } $response->end( $result ); }; $swooleServer->workStartCallback = function( $server, $worker_id ){ }; $swooleServer->taskCallback = function( $server , $request ){ //发送通知或者短信、邮件等 }; $swooleServer->finishCallback = function( $serv, $task_id, $data ){ // return $data; }; $swooleServer->messageCallback = function( $server, $iframe ){ //记录客户端信息 echo "Client connection fd {$iframe->fd} ".PHP_EOL; $data = json_decode( $iframe->data ,1 ); if( !empty($data['token']) ){ if( $data['token']== 'simplechat_open' ){ if( !self::$table->exist($iframe->fd) ){ $user = array_merge($data,['fd'=>$iframe->fd]); self::$table->set($iframe->fd,$user); //发送连接用户信息 foreach (self::$table as $v){ if($v['fd']!=$iframe->fd){ $pushData = array_merge($user,['action'=>'connect']); $server->push($v['fd'],json_encode($pushData)); } } } } if( $data['token']=='simplechat' ){ //查询所有连接用户,分发消息 foreach (self::$table as $v){ if($v['fd']!=$iframe->fd){ $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send']; $server->push($v['fd'],json_encode($pushData)); } } } } //接受消息,对消息进行解析,发送给组内人其他人 }; $swooleServer->closeCallback = function( $server, $fd, $reactorId ){ if( self::$table->exist($fd) ){ //退出房间处理 self::$table->del($fd); foreach (self::$table as $v){ $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove']; $server->push($v['fd'],json_encode($pushData)); } } echo "Client close fd {$fd}".PHP_EOL; }; $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL); $swooleServer->run(); } public function actionStop(){ $r = $this->sendSignal( SIGTERM ); if( $r ){ $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL); } } public function actionRestart(){ $this->sendSignal(SIGTERM); //向主进程发送SIGTERM实现关闭服务器 $this->actionStart(); } public function actionReload(){ $this->sendSignal(SIGUSR1); //向主进程/管理进程发送SIGUSR1信号,将平稳地restart所有Worker进程 } }
起了服务,客户端就可以连接通信了。
心跳检测
起了半天后服务常会断掉,查看监听端口进程状态,服务器输入:
$ netstat -anp |grep 9501
发现大量CLOSE_WAIT状态,常用状态有 ESTABLISHED 表示正在通信,TIME_WAIT 表示主动关闭,CLOSE_WAIT 表示被动关闭。
TIME_WAIT和CLOSE_WAIT两种状态如果一直被保持,意味着对应数目的通道就一直被占用,且“占着茅坑不使劲”,一旦句柄数达到上限,新的请求就无法处理。而且因为swoole是master-worker模式,
基本上http、tcp通信都是在worker进程,CLOSE_WAIT一直在,子进程将一直无法释放,随着时间的推移CLOSE_WAIT状态的进程越来越多,阻碍新的连接进来,websocket服务不可用。
主动关闭 和 被动关闭
TCP关闭 四次挥手过程如下:
挥手流程:
1、 客户端是调用函数close(),这时,客户端会发送一个FIN给服务器。
2、 服务器收到FIN,关闭套接字读通道,并将自己状态设置为CLOSE_WAIT(表示被动关闭),
并返回一个ACK给客户端。
3、 客户端收到ACK,关闭套接字写通道
接下来,服务器会调用close():
1、 服务器close(),发送一个FIN到客户端。
2、 客户端收到FIN,关闭读通道,并将自己状态设置成TIME_WAIT,发送一个ACK给服务器。
3、 服务器收到ACK,关闭写通道,并将自己状态设置为CLOSE。
4、 客户端等待两个最大数据传输时间,然后将自己状态设置成CLOSED。
由此我们看到CLOSE-WAIT 状态,TIME-WAIT 状态 产生的过程,产生的原因是复杂的,比如说网络通信中断、用户手机网络切换wifi网络、网络通信丢包等,故此tcp挥手过程会出现中断,继而
产生这些关闭状态。
为了解决这些占用连接数的异常连接,需要检测连接是否是活动的,对于死连接我们需要释放关闭它。
TIME_WAIT 主动关闭
主动关闭的一方在发送最后一个ACK包后,无论对方是否收到都会进入状态,等待2MSL(Maximum Segment Lifetime数据包的最大生命周期,是一个数据包能在互联网上生存的最长时间,若超过这个时间则该数据包将会消失在网络中)
的时间,才会释放网络资源。
TIME_WAIT状态的存在主要有两个原因:
1)可靠地实现TCP全双工连接的终止。在关TCP闭连接时,最后的ACK包是由主动关闭方发出的,如果这个ACK包丢失,则被动关闭方将重发FIN包,因此主动方必须维护状态信息,以允许它重发这个
ACK包。如果不维持这个状态信息,那么主动方将回到CLOSED状态,并对被动方重发的FIN包响应RST包,而被动关闭方将此包解释成一个错误。因而,要实现TCP全双工连接的正常终止,必须能够处
理四次握手协议中任意一个包丢失的情况,主动关闭方必须维持状态信息进入TIME_WAIT状态。
2)确保迷路重复数据包在网络中消失,防止上一次连接中的包迷路后重新出现,影响新连接。TCP数据包可能由于路由器异常而迷路,在迷路期间,数据包发送方可能因超时而重发这个包,迷路的
数据包在路由器恢复后也会被送到目的地,这个迷路的数据包就称为Lost Duplicate。在关闭一个TCP连接后,如果马上使用相同的IP地址和端口建立新的TCP连接,那么有可能出现前一个连接的迷
路重复数据包在前一个连接关闭后再次出现,影响新建立的连接。为了避免这一情况,TCP协议不允许使用处于TIME_WAIT状态的连接的IP和端口启动一个新连接,只有经过2MSL的时间,确保上一次
连接中所有的迷路重复数据包都已消失在网络中,才能安全地建立新连接。
如果Server主动关闭连接,同样会有大量的连接在关闭后处于TIME_WAIT状态,等待2MSL的时间后才能释放网络资源。对于并发连接,出现大量等待连接,新的连接进不来,会降低系统性能。
time_wait问题可以通过调整内核参数和适当的设置web服务器的keep-Alive值来解决。因为time_wait是自己可控的,要么就是对方连接的异常,要么就是自己没有快速的回收资源,总之不是由于自己程序错误引起的。
解决方式:
- 试图让Client主动关闭连接,由于每个Client的并发量都比较低,因而不会产生性能瓶颈
-
优化Server的系统TCP参数,使其网络资源的最大值、消耗速度和恢复速度达到平衡;
修改/etc/sysctl.conf
net.ipv4.tcp_tw_recycle = 1 #启用TIME-WAIT状态sockets的快速回收 net.ipv4.tcp_tw_reuse = 1 #允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭 #缓存每个连接最新的时间戳,后续请求中如果时间戳小于缓存的时间戳,即视为无效,相应的数据包会被丢弃,启用这种行为取决于tcp_timestamps和tcp_tw_recycle net.ipv4.tcp_timestamps = 1
CLOSE_WAIT 被动关闭
对方发送一个FIN后,程序自己这边没有进一步发送ACK以确认。换句话说就是在对方关闭连接后,程序里没有检测到,或者程序里本身就已经忘了这个时候需要关闭连接,于是这个资源就一直被程序占用着。
解决办法:
- 释放关闭掉异常的连接;
- 修复程序的bug,重新发布;
Keep-Alive
TCP中有一个Keep-Alive的机制可以检测死连接,LINUX内核包含对keepalive的支持,其中使用了三个参数:tcp_keepalive_time(开启keepalive的闲置时长)tcp_keepalive_intvl(keepalive探测包的发送
间隔)和tcp_keepalive_probes(如果对方不予应答,探测包的发送次数);如此服务端会隔断时间发送个探测包给客户端,可以是多次,如果在超出设置闲置时长,内核会关闭这个连接。
客户端主动发心跳
通过程序设置最大连接时长,如果客户端在这段时间内没有发送过数据,则关闭释放这个连接。
解决关闭问题
TIME_WAIT 倒是没有出现过, CLOSE_WAIT状态总会出现。
就看看文档,swoole有这些设置,当前使用的是TCP的keep-alive检测,只需改配置即可:
...
//检查死链接 使用操作系统提供的keepalive机制来踢掉死链接
'open_tcp_keepalive'=>1,
'tcp_keepidle'=> 1*60, //连接在n秒内没有数据请求,将开始对此连接进行探测
'tcp_keepcount' => 3, //探测的次数,超过次数后将close此连接
'tcp_keepinterval' => 0.5*60, //探测的间隔时间,单位秒
...
我设置的周期比较短,方便测试。
设置了这些看似稳定了,却还是会出现CLOSE_WAIT,后来查了日志,发生错误中断了,大概意思,代码中出现exit、die,显然常驻内存的swoole不支持这些,会立马中断程序。所以改些这些代码,
刚开始借助YII2.0写的,框架源码的问题,所以swoole这块服务需要单独出来,嗯。。。所以索性直接自己撸个。现在看来,服务跑起来稳定多了,一直没挂呢。
贴下临时地址:http://47.99.189.105:91/
系统监控及优化
系统很简单,但是作为研究,应该更透彻点。
我们的系统如何监控?如果说系统崩溃怎么办?能支撑多大并发?高并发下如何保持系统稳定。。。 一个高性能的即时通讯是如何架构的?
额,留待以后再研究下补充。
参考资料:
https://juejin.im/post/5c3b21e4e51d455231347349
如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,但是未经作者本人同意,转载文章之后必须在文章页面明显位置给出作者和原文连接,否则保留追究法律责任的权利。