zoukankan      html  css  js  c++  java
  • 剖析下聊天室

    由来

    环境:PHP7、Swoole、linux

    对聊天室有点感兴趣,对于网络协议有一点一知半解,所以决定借助swoole实现个简单的聊天室,来简单剖析下原理,知道原理以后就可以考虑用其他语言或者自己造轮子写个,当然这是后话。

    源码我放置github( https://github.com/WalkingSun/SwooleServer ),有兴趣可以借鉴借鉴。

    系统设计

    即时通讯的网络通信基于长连接,通信方式有TCP、UDP、socket、websocket等,本次实现是websocket,系统建立常驻内存的websocket服务,客户端即浏览器两者建立连接通信。通信过程如下:

    image

    关于客户端连接websocket服务,本文不做细述,websocket服务的建立借助swoole,需要在服务端的open、recieve、send、close建立回调处理,为了方便我将连接的客户端信息放入swoole_table(一个基于共享内存和锁实现的超高性能,并发数据结构)。

    代码仅供参考:

    websocket 服务类:

    <?php
    /**
     * Created by PhpStorm.
     * User: WalkingSun
     * Date: 2018/10/28
     * Time: 15:54
     */
    
    class WsServer
    {
        const host = '0.0.0.0';
        const port = '9501';
    
        public $swoole;
    
        public $config = ['gcSessionInterval' => 60000];
    
        public $openCallback;       //open回调
    
        public $messageCallback;    //message回调
    
        public $runApp;    //request回调
    
        public $workStartCallback;       //work回调
    
        public $finishCallback;     //finish回调
    
        public $closeCallback;       //close回调
    
        public $taskCallback;       //task回调
    
        public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[])
        {
            $host = $host?:self::host;
            $port = $port?:self::port;
            $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType);
            $this->webRoot = $swooleConfig['document_root'];
            if( !empty($this->config) ) $this->config = array_merge($this->config, $config);
            $this->swoole->set($swooleConfig);
    
            $this->swoole->on('open',[$this,'onOpen']);
            $this->swoole->on('message',[$this,'onMessage']);
            $this->swoole->on('request',[$this,'onRequest']);
            $this->swoole->on('WorkerStart',[$this,'onWorkerStart']);            //增加work进程
            $this->swoole->on('task',[$this,'onTask']);            //增加task任务进程
            $this->swoole->on('finish',[$this,'onFinish']);
            $this->swoole->on('close',[$this,'onClose']);
        }
    
        public function run(){
            $this->swoole->start();
        }
    
        /**
         * 当WebSocket客户端与服务器建立连接并完成握手后会回调此函数
         * @param $serv swoole_websocket_server 服务对象
         * @param $request swoole_http_server 服务对象
         */
        public function onOpen( swoole_websocket_server $serv,  $request){
            call_user_func_array( $this->openCallback, [ $serv, $request ] );
    
            //定时器(异步执行)
    //        if($request->fd == 1){
    //            swoole_timer_tick(2000,function($timer_id){
    //                echo time().PHP_EOL;
    //            });
    //        }
        }
    
        /**
         *当服务器收到来自客户端的数据帧时会回调此函数。
         * @param $server  swoole_websocket_server 服务对象
         * @param $frame  swoole_websocket_frame对象,包含了客户端发来的数据帧信息
         * $frame->fd,客户端的socket id,使用$server->push推送数据时需要用到
        $frame->data,数据内容,可以是文本内容也可以是二进制数据,可以通过opcode的值来判断
        $frame->opcode,WebSocket的OpCode类型,可以参考WebSocket协议标准文档
        $frame->finish, 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送(底层已经实现了自动合并数据帧,现在不用担心接收到的数据帧不完整)
         */
        public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){
    
            call_user_func_array( $this->messageCallback, [ $serv, $frame ]);
    
        }
    
        /**
         * @param $serv  swoole_websocket_server 服务对象
         * @param $fd  连接的文件描述符
         * @param $reactorId  来自那个reactor线程
         * onClose回调函数如果发生了致命错误,会导致连接泄漏。通过netstat命令会看到大量CLOSE_WAIT状态的TCP连接
         * 当服务器主动关闭连接时,底层会设置此参数为-1,可以通过判断$reactorId < 0来分辨关闭是由服务器端还是客户端发起的。
         */
        public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){
    
            call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]);
    
        }
    
        /**
         * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务。当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。
         * @param $serv  swoole_websocket_server 服务对象
         * @param $task_id int 任务id,由swoole扩展内自动生成,用于区分不同的任务。$task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
         * @param $src_worker_id int 来自于哪个worker进程
         * @param $data mixed 任务的内容
         */
        public function onTask(swoole_server $serv,  $task_id,  $src_worker_id,  $data){
    
            call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]);
    
    //        sleep(10);
    //        onTask函数中 return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成。
    //        return "task {$src_worker_id}-{$task_id} success";
        }
    
        /**
         * 当worker进程投递的任务在task_worker中完成时,task进程会通过swoole_server->finish()方法将任务处理的结果发送给worker进程。
         * @param $serv  swoole_websocket_server 服务对象
         * @param $task_id int  任务id
         * @param $data string  task任务处理的结果内容
         * task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish
        执行onFinish逻辑的worker进程与下发task任务的worker进程是同一个进程
         */
        public function onFinish(swoole_server $serv,  $task_id,  $data){
    
            call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]);
    //        echo $data;
    //        return $data;
        }
    
        public function onRequest( $request, $response ){
            call_user_func_array( $this->runApp, [ $request, $response ]);
        }
    
        public function onWorkerStart( $server,  $worker_id ){
            call_user_func_array( $this->workStartCallback , [$server,  $worker_id]);
        }
    }
    起服务和回调设置:
    
       class SwooleController extends BasicController{
    
           public $host;
    
           public $port;
    
           public  $swoole_config=[];
    
           public static $table;
    
    
           public function actionStart(){
    
               $config = include __DIR__ . '/../config/console.php';
    
               if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file'];
    
               if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file'];
    
               $this->swoole_config = array_merge(
                   [
                       'document_root' => $config['swoole']['document_root'],
                       'enable_static_handler'     => true,
       //                'daemonize'=>1,
                       'worker_num'=>4,
                       'max_request'=>2000,
       //            'task_worker_num'=>100,
                       //检查死链接  使用操作系统提供的keepalive机制来踢掉死链接
                       'open_tcp_keepalive'=>1,
                       'tcp_keepidle'=> 1*60,     //连接在n秒内没有数据请求,将开始对此连接进行探测
                       'tcp_keepcount' => 3,       //探测的次数,超过次数后将close此连接
                       'tcp_keepinterval' => 0.5*60,     //探测的间隔时间,单位秒
    
                       //swoole实现的心跳机制,只要客户端超过一定时间没发送数据,不管这个连接是不是死链接,都会关闭这个连接
       //            'heartbeat_check_interval' => 10*60,        //每m秒侦测一次心跳
       //            'heartbeat_idle_time' => 30*60,            //一个TCP连接如果在n秒内未向服务器端发送数据,将会被切断
                   ],$this->swoole_config
               );
    
               $this->host = $config['swoole']['host'];
               $this->port = $config['swoole']['port'];
               $swooleServer = new WsServer(  $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config);
    
               //连接信息保存到swoole_table
               self::$table = new swoole_table(10);
               self::$table->column('username',SwooleTable::TYPE_STRING, 10);
               self::$table->column('avatar',SwooleTable::TYPE_STRING, 255);
               self::$table->column('msg',SwooleTable::TYPE_STRING, 255);
               self::$table->column('fd',SwooleTable::TYPE_INT, 6);
               self::$table->create();
    
               $swooleServer->openCallback = function( $server , $request ){
                   echo "server handshake with fd={$request->fd}
    ";
               };
    
               $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){
    
                   //全局变量设置及app.log
                   $this->globalParam( $request );
                   $_SERVER['SERVER_SWOOLE'] = $swooleServer;
    
                   //记录日志
                   $apiData = $_SERVER;
                   unset($apiData['SERVER_SWOOLE']);
                   Common::addLog( $config['log'] , ($apiData) );
    
                   //解析路由
                   $r = $_GET['r'];
                   $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index');
                   $params = explode('/',$r);
                   $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php';
    
                   $result = '';
                   if( file_exists( $controller ) ){
                       require_once $controller;
                       $class = new ReflectionClass(ucfirst($params[0]).'Controller');
                       if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){
                           $instance  = $class->newInstanceArgs();
                           $method = $class->getmethod('action'.ucfirst($params[1])); // 获取类中方法
                           ob_start();
                           $method->invoke($instance);    // 执行方法
                           $result = ob_get_contents();
                           ob_clean();
                       }else{
                           $result = 'NOT FOUND!';
                       }
                   }else{
                       $result = "$controller not exist!";
                   }
    
                   $response->end( $result );
               };
    
               $swooleServer->workStartCallback = function( $server,  $worker_id ){
    
               };
    
               $swooleServer->taskCallback = function( $server , $request ){
                   //发送通知或者短信、邮件等
    
               };
    
               $swooleServer->finishCallback = function( $serv,  $task_id,  $data ){
    
       //            return $data;
               };
    
               $swooleServer->messageCallback = function( $server,  $iframe  ){
                   //记录客户端信息
                   echo "Client connection fd {$iframe->fd} ".PHP_EOL;
    
                   $data = json_decode( $iframe->data ,1 );
    
                   if( !empty($data['token']) ){
                       if( $data['token']== 'simplechat_open' ){
                           if( !self::$table->exist($iframe->fd) ){
                               $user = array_merge($data,['fd'=>$iframe->fd]);
                               self::$table->set($iframe->fd,$user);
    
                               //发送连接用户信息
                               foreach (self::$table as $v){
                                   if($v['fd']!=$iframe->fd){
                                       $pushData = array_merge($user,['action'=>'connect']);
                                       $server->push($v['fd'],json_encode($pushData));
                                   }
                               }
                           }
    
                       }
    
                       if( $data['token']=='simplechat' ){
                           //查询所有连接用户,分发消息
    
                           foreach (self::$table as $v){
                               if($v['fd']!=$iframe->fd){
                                   $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send'];
                                   $server->push($v['fd'],json_encode($pushData));
                               }
                           }
                       }
                   }
    
                   //接受消息,对消息进行解析,发送给组内人其他人
    
               };
    
               $swooleServer->closeCallback = function(  $server,  $fd,  $reactorId ){
    
                   if(  self::$table->exist($fd) ){
                       //退出房间处理
                       self::$table->del($fd);
                       foreach (self::$table as $v){
                           $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove'];
                           $server->push($v['fd'],json_encode($pushData));
                       }
                   }
    
                   echo  "Client close fd {$fd}".PHP_EOL;
               };
    
               $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL);
               $swooleServer->run();
           }
    
    
           public function actionStop(){
               $r = $this->sendSignal( SIGTERM );
               if( $r ){
                   $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL);
               }
           }
    
           public function actionRestart(){
               $this->sendSignal(SIGTERM);     //向主进程发送SIGTERM实现关闭服务器
               $this->actionStart();
           }
    
           public function actionReload(){
               $this->sendSignal(SIGUSR1);   //向主进程/管理进程发送SIGUSR1信号,将平稳地restart所有Worker进程
           }
    
       }

    起了服务,客户端就可以连接通信了。

    心跳检测

    起了半天后服务常会断掉,查看监听端口进程状态,服务器输入:

    $ netstat -anp |grep 9501

    发现大量CLOSE_WAIT状态,常用状态有 ESTABLISHED 表示正在通信,TIME_WAIT 表示主动关闭,CLOSE_WAIT 表示被动关闭。

    TIME_WAIT和CLOSE_WAIT两种状态如果一直被保持,意味着对应数目的通道就一直被占用,且“占着茅坑不使劲”,一旦句柄数达到上限,新的请求就无法处理。而且因为swoole是master-worker模式,
    基本上http、tcp通信都是在worker进程,CLOSE_WAIT一直在,子进程将一直无法释放,随着时间的推移CLOSE_WAIT状态的进程越来越多,阻碍新的连接进来,websocket服务不可用。

    主动关闭 和 被动关闭
    TCP关闭 四次挥手过程如下:

    image

    挥手流程:

    1、 客户端是调用函数close(),这时,客户端会发送一个FIN给服务器。

    2、 服务器收到FIN,关闭套接字读通道,并将自己状态设置为CLOSE_WAIT(表示被动关闭),
    并返回一个ACK给客户端。

    3、 客户端收到ACK,关闭套接字写通道
    接下来,服务器会调用close():

    1、 服务器close(),发送一个FIN到客户端。

    2、 客户端收到FIN,关闭读通道,并将自己状态设置成TIME_WAIT,发送一个ACK给服务器。

    3、 服务器收到ACK,关闭写通道,并将自己状态设置为CLOSE。

    4、 客户端等待两个最大数据传输时间,然后将自己状态设置成CLOSED。

    由此我们看到CLOSE-WAIT 状态,TIME-WAIT 状态 产生的过程,产生的原因是复杂的,比如说网络通信中断、用户手机网络切换wifi网络、网络通信丢包等,故此tcp挥手过程会出现中断,继而
    产生这些关闭状态。

    为了解决这些占用连接数的异常连接,需要检测连接是否是活动的,对于死连接我们需要释放关闭它。

    TIME_WAIT 主动关闭

    主动关闭的一方在发送最后一个ACK包后,无论对方是否收到都会进入状态,等待2MSL(Maximum Segment Lifetime数据包的最大生命周期,是一个数据包能在互联网上生存的最长时间,若超过这个时间则该数据包将会消失在网络中)
    的时间,才会释放网络资源。

    TIME_WAIT状态的存在主要有两个原因:

    1)可靠地实现TCP全双工连接的终止。在关TCP闭连接时,最后的ACK包是由主动关闭方发出的,如果这个ACK包丢失,则被动关闭方将重发FIN包,因此主动方必须维护状态信息,以允许它重发这个
    ACK包。如果不维持这个状态信息,那么主动方将回到CLOSED状态,并对被动方重发的FIN包响应RST包,而被动关闭方将此包解释成一个错误。因而,要实现TCP全双工连接的正常终止,必须能够处
    理四次握手协议中任意一个包丢失的情况,主动关闭方必须维持状态信息进入TIME_WAIT状态。

    2)确保迷路重复数据包在网络中消失,防止上一次连接中的包迷路后重新出现,影响新连接。TCP数据包可能由于路由器异常而迷路,在迷路期间,数据包发送方可能因超时而重发这个包,迷路的
    数据包在路由器恢复后也会被送到目的地,这个迷路的数据包就称为Lost Duplicate。在关闭一个TCP连接后,如果马上使用相同的IP地址和端口建立新的TCP连接,那么有可能出现前一个连接的迷
    路重复数据包在前一个连接关闭后再次出现,影响新建立的连接。为了避免这一情况,TCP协议不允许使用处于TIME_WAIT状态的连接的IP和端口启动一个新连接,只有经过2MSL的时间,确保上一次
    连接中所有的迷路重复数据包都已消失在网络中,才能安全地建立新连接。

    如果Server主动关闭连接,同样会有大量的连接在关闭后处于TIME_WAIT状态,等待2MSL的时间后才能释放网络资源。对于并发连接,出现大量等待连接,新的连接进不来,会降低系统性能。

    time_wait问题可以通过调整内核参数和适当的设置web服务器的keep-Alive值来解决。因为time_wait是自己可控的,要么就是对方连接的异常,要么就是自己没有快速的回收资源,总之不是由于自己程序错误引起的。

    解决方式:

    • 试图让Client主动关闭连接,由于每个Client的并发量都比较低,因而不会产生性能瓶颈
    • 优化Server的系统TCP参数,使其网络资源的最大值、消耗速度和恢复速度达到平衡;

      修改/etc/sysctl.conf

      net.ipv4.tcp_tw_recycle = 1       #启用TIME-WAIT状态sockets的快速回收
      
      net.ipv4.tcp_tw_reuse = 1         #允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭
      
      #缓存每个连接最新的时间戳,后续请求中如果时间戳小于缓存的时间戳,即视为无效,相应的数据包会被丢弃,启用这种行为取决于tcp_timestamps和tcp_tw_recycle
      net.ipv4.tcp_timestamps = 1

    CLOSE_WAIT 被动关闭

    对方发送一个FIN后,程序自己这边没有进一步发送ACK以确认。换句话说就是在对方关闭连接后,程序里没有检测到,或者程序里本身就已经忘了这个时候需要关闭连接,于是这个资源就一直被程序占用着。

    解决办法:

    • 释放关闭掉异常的连接;
    • 修复程序的bug,重新发布;

    Keep-Alive

    TCP中有一个Keep-Alive的机制可以检测死连接,LINUX内核包含对keepalive的支持,其中使用了三个参数:tcp_keepalive_time(开启keepalive的闲置时长)tcp_keepalive_intvl(keepalive探测包的发送
    间隔)和tcp_keepalive_probes(如果对方不予应答,探测包的发送次数);如此服务端会隔断时间发送个探测包给客户端,可以是多次,如果在超出设置闲置时长,内核会关闭这个连接。

    客户端主动发心跳

    通过程序设置最大连接时长,如果客户端在这段时间内没有发送过数据,则关闭释放这个连接。

    解决关闭问题

    TIME_WAIT 倒是没有出现过, CLOSE_WAIT状态总会出现。

    就看看文档,swoole有这些设置,当前使用的是TCP的keep-alive检测,只需改配置即可:

       ...
        //检查死链接  使用操作系统提供的keepalive机制来踢掉死链接
                    'open_tcp_keepalive'=>1,
                    'tcp_keepidle'=> 1*60,     //连接在n秒内没有数据请求,将开始对此连接进行探测
                    'tcp_keepcount' => 3,       //探测的次数,超过次数后将close此连接
                    'tcp_keepinterval' => 0.5*60,     //探测的间隔时间,单位秒
       ...

    我设置的周期比较短,方便测试。

    设置了这些看似稳定了,却还是会出现CLOSE_WAIT,后来查了日志,发生错误中断了,大概意思,代码中出现exit、die,显然常驻内存的swoole不支持这些,会立马中断程序。所以改些这些代码,
    刚开始借助YII2.0写的,框架源码的问题,所以swoole这块服务需要单独出来,嗯。。。所以索性直接自己撸个。现在看来,服务跑起来稳定多了,一直没挂呢。

    贴下临时地址:http://47.99.189.105:91/

    系统监控及优化

    系统很简单,但是作为研究,应该更透彻点。

    我们的系统如何监控?如果说系统崩溃怎么办?能支撑多大并发?高并发下如何保持系统稳定。。。 一个高性能的即时通讯是如何架构的?

    额,留待以后再研究下补充。

    参考资料:

    https://juejin.im/post/5c3b21e4e51d455231347349



    如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!欢迎各位转载,但是未经作者本人同意,转载文章之后必须在文章页面明显位置给出作者和原文连接,否则保留追究法律责任的权利。
  • 相关阅读:
    hdu 1199 Color the Ball 离散线段树
    poj 2623 Sequence Median 堆的灵活运用
    hdu 2251 Dungeon Master bfs
    HDU 1166 敌兵布阵 线段树
    UVALive 4426 Blast the Enemy! 计算几何求重心
    UVALive 4425 Another Brick in the Wall 暴力
    UVALive 4423 String LD 暴力
    UVALive 4872 Underground Cables 最小生成树
    UVALive 4870 Roller Coaster 01背包
    UVALive 4869 Profits DP
  • 原文地址:https://www.cnblogs.com/applelife/p/10448377.html
Copyright © 2011-2022 走看看