zoukankan      html  css  js  c++  java
  • 剖析下聊天室

    由来

    环境:PHP7、Swoole、linux

    对聊天室有点感兴趣,对于网络协议有一点一知半解,所以决定借助swoole实现个简单的聊天室,来简单剖析下原理,知道原理以后就可以考虑用其他语言或者自己造轮子写个,当然这是后话。

    源码我放置github( https://github.com/WalkingSun/SwooleServer ),有兴趣可以借鉴借鉴。

    系统设计

    即时通讯的网络通信基于长连接,通信方式有TCP、UDP、socket、websocket等,本次实现是websocket,系统建立常驻内存的websocket服务,客户端即浏览器两者建立连接通信。通信过程如下:

    image

    关于客户端连接websocket服务,本文不做细述,websocket服务的建立借助swoole,需要在服务端的open、recieve、send、close建立回调处理,为了方便我将连接的客户端信息放入swoole_table(一个基于共享内存和锁实现的超高性能,并发数据结构)。

    代码仅供参考:

    websocket 服务类:

    <?php
    /**
     * Created by PhpStorm.
     * User: WalkingSun
     * Date: 2018/10/28
     * Time: 15:54
     */
    
    class WsServer
    {
        const host = '0.0.0.0';
        const port = '9501';
    
        public $swoole;
    
        public $config = ['gcSessionInterval' => 60000];
    
        public $openCallback;       //open回调
    
        public $messageCallback;    //message回调
    
        public $runApp;    //request回调
    
        public $workStartCallback;       //work回调
    
        public $finishCallback;     //finish回调
    
        public $closeCallback;       //close回调
    
        public $taskCallback;       //task回调
    
        public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[])
        {
            $host = $host?:self::host;
            $port = $port?:self::port;
            $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType);
            $this->webRoot = $swooleConfig['document_root'];
            if( !empty($this->config) ) $this->config = array_merge($this->config, $config);
            $this->swoole->set($swooleConfig);
    
            $this->swoole->on('open',[$this,'onOpen']);
            $this->swoole->on('message',[$this,'onMessage']);
            $this->swoole->on('request',[$this,'onRequest']);
            $this->swoole->on('WorkerStart',[$this,'onWorkerStart']);            //增加work进程
            $this->swoole->on('task',[$this,'onTask']);            //增加task任务进程
            $this->swoole->on('finish',[$this,'onFinish']);
            $this->swoole->on('close',[$this,'onClose']);
        }
    
        public function run(){
            $this->swoole->start();
        }
    
        /**
         * 当WebSocket客户端与服务器建立连接并完成握手后会回调此函数
         * @param $serv swoole_websocket_server 服务对象
         * @param $request swoole_http_server 服务对象
         */
        public function onOpen( swoole_websocket_server $serv,  $request){
            call_user_func_array( $this->openCallback, [ $serv, $request ] );
    
            //定时器(异步执行)
    //        if($request->fd == 1){
    //            swoole_timer_tick(2000,function($timer_id){
    //                echo time().PHP_EOL;
    //            });
    //        }
        }
    
        /**
         *当服务器收到来自客户端的数据帧时会回调此函数。
         * @param $server  swoole_websocket_server 服务对象
         * @param $frame  swoole_websocket_frame对象,包含了客户端发来的数据帧信息
         * $frame->fd,客户端的socket id,使用$server->push推送数据时需要用到
        $frame->data,数据内容,可以是文本内容也可以是二进制数据,可以通过opcode的值来判断
        $frame->opcode,WebSocket的OpCode类型,可以参考WebSocket协议标准文档
        $frame->finish, 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送(底层已经实现了自动合并数据帧,现在不用担心接收到的数据帧不完整)
         */
        public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){
    
            call_user_func_array( $this->messageCallback, [ $serv, $frame ]);
    
        }
    
        /**
         * @param $serv  swoole_websocket_server 服务对象
         * @param $fd  连接的文件描述符
         * @param $reactorId  来自那个reactor线程
         * onClose回调函数如果发生了致命错误,会导致连接泄漏。通过netstat命令会看到大量CLOSE_WAIT状态的TCP连接
         * 当服务器主动关闭连接时,底层会设置此参数为-1,可以通过判断$reactorId < 0来分辨关闭是由服务器端还是客户端发起的。
         */
        public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){
    
            call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]);
    
        }
    
        /**
         * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务。当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。
         * @param $serv  swoole_websocket_server 服务对象
         * @param $task_id int 任务id,由swoole扩展内自动生成,用于区分不同的任务。$task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
         * @param $src_worker_id int 来自于哪个worker进程
         * @param $data mixed 任务的内容
         */
        public function onTask(swoole_server $serv,  $task_id,  $src_worker_id,  $data){
    
            call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]);
    
    //        sleep(10);
    //        onTask函数中 return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成。
    //        return "task {$src_worker_id}-{$task_id} success";
        }
    
        /**
         * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程。
         * @param $serv  swoole_websocket_server 服务对象
         * @param $task_id int  任务id
         * @param $data string  task任务处理的结果内容
         * task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish
        执行onFinish逻辑的worker进程与下发task任务的worker进程是同一个进程
         */
        public function onFinish(swoole_server $serv,  $task_id,  $data){
    
            call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]);
    //        echo $data;
    //        return $data;
        }
    
        public function onRequest( $request, $response ){
            call_user_func_array( $this->runApp, [ $request, $response ]);
        }
    
        public function onWorkerStart( $server,  $worker_id ){
            call_user_func_array( $this->workStartCallback , [$server,  $worker_id]);
        }
    }
    

    起服务和回调设置:

       class SwooleController extends BasicController{
    
           public $host;
    
           public $port;
    
           public  $swoole_config=[];
    
           public static $table;
    
    
           public function actionStart(){
    
               $config = include __DIR__ . '/../config/console.php';
    
               if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file'];
    
               if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file'];
    
               $this->swoole_config = array_merge(
                   [
                       'document_root' => $config['swoole']['document_root'],
                       'enable_static_handler'     => true,
       //                'daemonize'=>1,
                       'worker_num'=>4,
                       'max_request'=>2000,
       //            'task_worker_num'=>100,
                       //检查死链接  使用操作系统提供的keepalive机制来踢掉死链接
                       'open_tcp_keepalive'=>1,
                       'tcp_keepidle'=> 1*60,     //连接在n秒内没有数据请求,将开始对此连接进行探测
                       'tcp_keepcount' => 3,       //探测的次数,超过次数后将close此连接
                       'tcp_keepinterval' => 0.5*60,     //探测的间隔时间,单位秒
    
                       //swoole实现的心跳机制,只要客户端超过一定时间没发送数据,不管这个连接是不是死链接,都会关闭这个连接
       //            'heartbeat_check_interval' => 10*60,        //每m秒侦测一次心跳
       //            'heartbeat_idle_time' => 30*60,            //一个TCP连接如果在n秒内未向服务器端发送数据,将会被切断
                   ],$this->swoole_config
               );
    
               $this->host = $config['swoole']['host'];
               $this->port = $config['swoole']['port'];
               $swooleServer = new WsServer(  $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config);
    
               //连接信息保存到swoole_table
               self::$table = new swoole_table(10);
               self::$table->column('username',SwooleTable::TYPE_STRING, 10);
               self::$table->column('avatar',SwooleTable::TYPE_STRING, 255);
               self::$table->column('msg',SwooleTable::TYPE_STRING, 255);
               self::$table->column('fd',SwooleTable::TYPE_INT, 6);
               self::$table->create();
    
               $swooleServer->openCallback = function( $server , $request ){
                   echo "server handshake with fd={$request->fd}
    ";
               };
    
               $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){
    
                   //全局变量设置及app.log
                   $this->globalParam( $request );
                   $_SERVER['SERVER_SWOOLE'] = $swooleServer;
    
                   //记录日志
                   $apiData = $_SERVER;
                   unset($apiData['SERVER_SWOOLE']);
                   Common::addLog( $config['log'] , ($apiData) );
    
                   //解析路由
                   $r = $_GET['r'];
                   $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index');
                   $params = explode('/',$r);
                   $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php';
    
                   $result = '';
                   if( file_exists( $controller ) ){
                       require_once $controller;
                       $class = new ReflectionClass(ucfirst($params[0]).'Controller');
                       if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){
                           $instance  = $class->newInstanceArgs();
                           $method = $class->getmethod('action'.ucfirst($params[1])); // 获取类中方法
                           ob_start();
                           $method->invoke($instance);    // 执行方法
                           $result = ob_get_contents();
                           ob_clean();
                       }else{
                           $result = 'NOT FOUND!';
                       }
                   }else{
                       $result = "$controller not exist!";
                   }
    
                   $response->end( $result );
               };
    
               $swooleServer->workStartCallback = function( $server,  $worker_id ){
    
               };
    
               $swooleServer->taskCallback = function( $server , $request ){
                   //发送通知或者短信、邮件等
    
               };
    
               $swooleServer->finishCallback = function( $serv,  $task_id,  $data ){
    
       //            return $data;
               };
    
               $swooleServer->messageCallback = function( $server,  $iframe  ){
                   //记录客户端信息
                   echo "Client connection fd {$iframe->fd} ".PHP_EOL;
    
                   $data = json_decode( $iframe->data ,1 );
    
                   if( !empty($data['token']) ){
                       if( $data['token']== 'simplechat_open' ){
                           if( !self::$table->exist($iframe->fd) ){
                               $user = array_merge($data,['fd'=>$iframe->fd]);
                               self::$table->set($iframe->fd,$user);
    
                               //发送连接用户信息
                               foreach (self::$table as $v){
                                   if($v['fd']!=$iframe->fd){
                                       $pushData = array_merge($user,['action'=>'connect']);
                                       $server->push($v['fd'],json_encode($pushData));
                                   }
                               }
                           }
    
                       }
    
                       if( $data['token']=='simplechat' ){
                           //查询所有连接用户,分发消息
    
                           foreach (self::$table as $v){
                               if($v['fd']!=$iframe->fd){
                                   $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send'];
                                   $server->push($v['fd'],json_encode($pushData));
                               }
                           }
                       }
                   }
    
                   //接受消息,对消息进行解析,发送给组内人其他人
    
               };
    
               $swooleServer->closeCallback = function(  $server,  $fd,  $reactorId ){
    
                   if(  self::$table->exist($fd) ){
                       //退出房间处理
                       self::$table->del($fd);
                       foreach (self::$table as $v){
                           $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove'];
                           $server->push($v['fd'],json_encode($pushData));
                       }
                   }
    
                   echo  "Client close fd {$fd}".PHP_EOL;
               };
    
               $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL);
               $swooleServer->run();
           }
    
    
           public function actionStop(){
               $r = $this->sendSignal( SIGTERM );
               if( $r ){
                   $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL);
               }
           }
    
           public function actionRestart(){
               $this->sendSignal(SIGTERM);     //向主进程发送SIGTERM实现关闭服务器
               $this->actionStart();
           }
    
           public function actionReload(){
               $this->sendSignal(SIGUSR1);   //向主进程/管理进程发送SIGUSR1信号,将平稳地restart所有Worker进程
           }
    
       }
    

    起了服务,客户端就可以连接通信了。

    心跳检测

    起了半天后服务常会断掉,查看监听端口进程状态,服务器输入:

    $ netstat -anp |grep 9501
    

    发现大量CLOSE_WAIT状态,常用状态有 ESTABLISHED 表示正在通信,TIME_WAIT 表示主动关闭,CLOSE_WAIT 表示被动关闭。

    TIME_WAIT和CLOSE_WAIT两种状态如果一直被保持,意味着对应数目的通道就一直被占用,且“占着茅坑不使劲”,一旦句柄数达到上限,新的请求就无法处理。而且因为swoole是master-worker模式,
    基本上http、tcp通信都是在worker进程,CLOSE_WAIT一直在,子进程将一直无法释放,随着时间的推移CLOSE_WAIT状态的进程越来越多,阻碍新的连接进来,websocket服务不可用。

    主动关闭 和 被动关闭
    TCP关闭 四次挥手过程如下:

    image

    挥手流程:

    1、 客户端是调用函数close(),这时,客户端会发送一个FIN给服务器。

    2、 服务器收到FIN,关闭套接字读通道,并将自己状态设置为CLOSE_WAIT(表示被动关闭),
    并返回一个ACK给客户端。

    3、 客户端收到ACK,关闭套接字写通道
    接下来,服务器会调用close():

    1、 服务器close(),发送一个FIN到客户端。

    2、 客户端收到FIN,关闭读通道,并将自己状态设置成TIME_WAIT,发送一个ACK给服务器。

    3、 服务器收到ACK,关闭写通道,并将自己状态设置为CLOSE。

    4、 客户端等待两个最大数据传输时间,然后将自己状态设置成CLOSED。

    由此我们看到CLOSE-WAIT 状态,TIME-WAIT 状态 产生的过程,产生的原因是复杂的,比如说网络通信中断、用户手机网络切换wifi网络、网络通信丢包等,故此tcp挥手过程会出现中断,继而
    产生这些关闭状态。

    为了解决这些占用连接数的异常连接,需要检测连接是否是活动的,对于死连接我们需要释放关闭它。

    TIME_WAIT 主动关闭

    主动关闭的一方在发送最后一个ACK包后,无论对方是否收到都会进入状态,等待2MSL(Maximum Segment Lifetime数据包的最大生命周期,是一个数据包能在互联网上生存的最长时间,若超过这个时间则该数据包将会消失在网络中)
    的时间,才会释放网络资源。

    TIME_WAIT状态的存在主要有两个原因:

    1)可靠地实现TCP全双工连接的终止。在关TCP闭连接时,最后的ACK包是由主动关闭方发出的,如果这个ACK包丢失,则被动关闭方将重发FIN包,因此主动方必须维护状态信息,以允许它重发这个
    ACK包。如果不维持这个状态信息,那么主动方将回到CLOSED状态,并对被动方重发的FIN包响应RST包,而被动关闭方将此包解释成一个错误。因而,要实现TCP全双工连接的正常终止,必须能够处
    理四次握手协议中任意一个包丢失的情况,主动关闭方必须维持状态信息进入TIME_WAIT状态。

    2)确保迷路重复数据包在网络中消失,防止上一次连接中的包迷路后重新出现,影响新连接。TCP数据包可能由于路由器异常而迷路,在迷路期间,数据包发送方可能因超时而重发这个包,迷路的
    数据包在路由器恢复后也会被送到目的地,这个迷路的数据包就称为Lost Duplicate。在关闭一个TCP连接后,如果马上使用相同的IP地址和端口建立新的TCP连接,那么有可能出现前一个连接的迷
    路重复数据包在前一个连接关闭后再次出现,影响新建立的连接。为了避免这一情况,TCP协议不允许使用处于TIME_WAIT状态的连接的IP和端口启动一个新连接,只有经过2MSL的时间,确保上一次
    连接中所有的迷路重复数据包都已消失在网络中,才能安全地建立新连接。

    如果Server主动关闭连接,同样会有大量的连接在关闭后处于TIME_WAIT状态,等待2MSL的时间后才能释放网络资源。对于并发连接,出现大量等待连接,新的连接进不来,会降低系统性能。

    time_wait问题可以通过调整内核参数和适当的设置web服务器的keep-Alive值来解决。因为time_wait是自己可控的,要么就是对方连接的异常,要么就是自己没有快速的回收资源,总之不是由于自己程序错误引起的。

    解决方式:

    • 试图让Client主动关闭连接,由于每个Client的并发量都比较低,因而不会产生性能瓶颈

    • 优化Server的系统TCP参数,使其网络资源的最大值、消耗速度和恢复速度达到平衡;

      修改/etc/sysctl.conf

      net.ipv4.tcp_tw_recycle = 1       #启用TIME-WAIT状态sockets的快速回收
    
      net.ipv4.tcp_tw_reuse = 1         #允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭
    
      #缓存每个连接最新的时间戳,后续请求中如果时间戳小于缓存的时间戳,即视为无效,相应的数据包会被丢弃,启用这种行为取决于tcp_timestamps和tcp_tw_recycle
      net.ipv4.tcp_timestamps = 1
    

    CLOSE_WAIT 被动关闭

    对方发送一个FIN后,程序自己这边没有进一步发送ACK以确认。换句话说就是在对方关闭连接后,程序里没有检测到,或者程序里本身就已经忘了这个时候需要关闭连接,于是这个资源就一直被程序占用着。

    解决办法:

    • 释放关闭掉异常的连接;
    • 修复程序的bug,重新发布;

    Keep-Alive

    TCP中有一个Keep-Alive的机制可以检测死连接,LINUX内核包含对keepalive的支持,其中使用了三个参数:tcp_keepalive_time(开启keepalive的闲置时长)tcp_keepalive_intvl(keepalive探测包的发送
    间隔)和tcp_keepalive_probes(如果对方不予应答,探测包的发送次数);如此服务端会隔断时间发送个探测包给客户端,可以是多次,如果在超出设置闲置时长,内核会关闭这个连接。

    客户端主动发心跳

    通过程序设置最大连接时长,如果客户端在这段时间内没有发送过数据,则关闭释放这个连接。

    解决关闭问题

    TIME_WAIT 倒是没有出现过, CLOSE_WAIT状态总会出现。

    就看看文档,swoole有这些设置,当前使用的是TCP的keep-alive检测,只需改配置即可:

       ...
        //检查死链接  使用操作系统提供的keepalive机制来踢掉死链接
                    'open_tcp_keepalive'=>1,
                    'tcp_keepidle'=> 1*60,     //连接在n秒内没有数据请求,将开始对此连接进行探测
                    'tcp_keepcount' => 3,       //探测的次数,超过次数后将close此连接
                    'tcp_keepinterval' => 0.5*60,     //探测的间隔时间,单位秒
       ...
    

    我设置的周期比较短,方便测试。

    设置了这些看似稳定了,却还是会出现CLOSE_WAIT,后来查了日志,发生错误中断了,大概意思,代码中出现exit、die,显然常驻内存的swoole不支持这些,会立马中断程序。所以改些这些代码,
    刚开始借助YII2.0写的,框架源码的问题,所以swoole这块服务需要单独出来,嗯。。。所以索性直接自己撸个。现在看来,服务跑起来稳定多了,一直没挂呢。

    贴下临时地址:http://47.99.189.105:91/

    系统监控及优化

    系统很简单,但是作为研究,应该更透彻点。

    我们的系统如何监控?如果说系统崩溃怎么办?能支撑多大并发?高并发下如何保持系统稳定。。。 一个高性能的即时通讯是如何架构的?

    额,留待以后再研究下补充。

    参考资料:

    https://juejin.im/post/5c3b21e4e51d455231347349

  • 相关阅读:
    mysql主从之过滤复制
    mysql主从之主从延时监控及原因分析
    mysql误删搭建有主从的主库master binlog处理
    mysql 主从复制原理,概念,故障的检控/分析/处理
    mysql之主从复制搭建
    mysql表的物理存储结构及逻辑结构
    sql语句执行过程及mysql权限管理
    linux安装mysql5.7.30 及配置多实例
    mysql备份恢复之xtrabackup (XBK、Xbackup)
    mysql 备份之逻辑备份mysqldump, mysqlbinlog, 全备及参数说明
  • 原文地址:https://www.cnblogs.com/followyou/p/10360252.html
Copyright © 2011-2022 走看看