zoukankan      html  css  js  c++  java
  • SRS之SrsServer::cycle()

    1. SrsServer 相关类定义

    1.1 SrsServer 类定义

    /**
     * SRS RTMP server, initialize and listen,
     * start connection service thread, destroy client.
     */
    class SrsServer : virtual public ISrsReloadHandler 
        , virtual public ISrsSourceHandler
        , virtual public IConnectionManager
    {
    private:
    #ifdef SRS_AUTO_HTTP_API
        // TODO: FIXME: rename to http_api
        SrsHttpServeMux* http_api_mux;
    #endif
    #ifdef SRS_AUTO_HTTP_SERVER
        SrsHttpServer* http_server;
    #endif
    #ifdef SRS_AUTO_HTTP_CORE
        SrsHttpHeartbeat* http_heartbeat;
    #endif
    #ifdef SRS_AUTO_INGEST
        SrsIngester* ingester;
    #endif
    private:
        /**
         * the pid file fd, lock the file write when server is running.
         * @remark the init.d script should cleanup the pid file, when stop service,
         *         for the server never delete the file; when system startup, the pid
         *         in pid file maybe valid but the process is not SRS, the init.d script 
         *         will never start server.
         */
        int pid_fd;
        /**
         * all connections, connection manager
         */
        std::vector<SrsConnection*> conns;
        /**
         * all listeners, listener manager.
         */
        std::vector<SrsListener*> listeners;
        /**
         * signal manager which convert signal to io message.
         */
        SrsSignalManager* signal_manager;
        /**
         * handle in server cycle.
         */
        ISrsServerCycle* handler;
        /**
         * user send the signal, convert to variable.
         */
        bool signal_reload;
        bool signal_gmc_stop;
        bool signal_gracefully_quit;
        /* parent pid for asprocess. */
        int ppid;
    public:
        SrsServer();
        virtual ~SrsServer();
    private:
        /**
         * the destroy is for gmc to analysis the memory leak,
         * if not destroy global/static data, the gmc will will warning memory leak.
         * in service, server never destroy, directly exit when restart.
         */
        virtual void destroy();
        /**
         * when SIGTERM, SRS should do cleanup, for example,
         * to stop all ingesters, cleanup HLS and dvr.
         */
        virtual void dispose();
    // server startup workflow, @see run_master()
    public:
        /**
         * initialize server with callback handler.
         * @remark user must free the cycle handler.
         */
        virtual int initialize(ISrsServerCycle* cycle_handler);
        virtual int initialize_st();
        virtual int initialize_signal();
        virtual int acquire_pid_file();
        virtual int listen();
        virtual int register_signal();
        virtual int http_handle();
        virtual int ingest();
        virtual int cycle();
    // IConnectionManager
    public:
        /**
         * callback for connection to remove itself.
         * when connection thread cycle terminatedk, callback this to delete connection.
         * @see SrsConnection.on_thread_stop().
         */
        virtual void remove(SrsConnection* conn);
    // server utilities.
    public:
        /**
         * callback for signal manager got a signal.
         * the signal manager convert signal to io message,
         * whatever, we will got the signo like the orignal signal(int signo) handler.
         * @remark, directly exit for SIGTERM.
         * @remark, do reload for SIGNAL_RELOAD.
         * @remark, for SIGINT and SIGUSR2:
         *       no gmc, directly exit.
         *       for gmc, set the variable signal_gmc_stop, the cycle will return 
         *       and cleanup for gmc.
         */
        virtual void on_signal(int signo);
    private:
        /**
         * the server thread main cycle,
         * update the global static data, for instance, the current time,
         * the cpu/mem/network statistic.
         */
        virtual int do_cycle();
        /**
         * listen at specified protocol.
         */
        virtual int listen_rtmp();
        virtual int listen_http_api();
        virtual int listen_http_stream();
        virtual int listen_stream_caster();
        /**
         * close the listeners for specified type,
         * remove the listen object from manager.
         */
        virtual void close_listeners(SrsListenerType type);
        /**
         * resample the server kbs.
         */
        virtual void resample_kbps();
    // internal only
    public:
        /**
         * when listener got a fd, notice server to accept it.
         * @param type, the client type, used to create concreate connection,
         *        for instance RTMP connection to server client.
         * @param client_stfd, the client fd in st boxed, the underlayer fd.
         */
        virtual int accept_client(SrsListenerType type, st_netfd_t client_stfd);
    // interface ISrsReloadHandler.
    public:
        virtual int on_reload_listen();
        virtual int on_reload_pid();
        virtual int on_reload_vhost_added(std::string vhost);
        virtual int on_reload_vhost_removed(std::string vhost);
        virtual int on_reload_http_api_enabled();
        virtual int on_reload_http_api_disabled();
        virtual int on_reload_http_stream_enabled();
        virtual int on_reload_http_stream_disabled();
        virtual int on_reload_http_stream_updated();
    // interface ISrsSourceHandler
    public:
        virtual int on_publish(SrsSource* s, SrsRequest* r);
        virtual void on_unpublish(SrsSource* s, SrsRequest* r);
    };
    

    1.2 ISrsReloadHandler 类定义

    /**
     * the handler for config reload.
     * when reload callback, the config is updated yet.
     * 
     * features not support reload, 
     * @see: https://github.com/ossrs/srs/wiki/v1_CN_Reload#notsupportedfeatures
     */
    class ISrsReloadHandler
    {
    public:
        ISrsReloadHandler();
        virtual ~ISrsReloadHandler();
    public:
        virtual int on_reload_utc_time();
        virtual int on_reload_max_conns();
        virtual int on_reload_listen();
        virtual int on_reload_pid();
        virtual int on_reload_log_tank();
        virtual int on_reload_log_level();
        virtual int on_reload_log_file();
        virtual int on_reload_pithy_print();
        virtual int on_reload_http_api_enabled();
        virtual int on_reload_http_api_disabled();
        virtual int on_reload_http_stream_enabled();
        virtual int on_reload_http_stream_disabled();
        virtual int on_reload_http_stream_updated();
    public:
        // TODO: FIXME: should rename to http_static
        virtual int on_reload_vhost_http_updated();
        virtual int on_reload_vhost_http_remux_updated(std::string vhost);
        virtual int on_reload_vhost_added(std::string vhost);
        virtual int on_reload_vhost_removed(std::string vhost);
        virtual int on_reload_vhost_atc(std::string vhost);
        virtual int on_reload_vhost_gop_cache(std::string vhost);
        virtual int on_reload_vhost_queue_length(std::string vhost);
        virtual int on_reload_vhost_time_jitter(std::string vhost);
        virtual int on_reload_vhost_mix_correct(std::string vhost);
        virtual int on_reload_vhost_forward(std::string vhost);
        virtual int on_reload_vhost_hls(std::string vhost);
        virtual int on_reload_vhost_hds(std::string vhost);
        virtual int on_reload_vhost_dvr(std::string vhost);
        virtual int on_reload_vhost_mr(std::string vhost);
        virtual int on_reload_vhost_mw(std::string vhost);
        virtual int on_reload_vhost_smi(std::string vhost);
        virtual int on_reload_vhost_tcp_nodelay(std::string vhost);
        virtual int on_reload_vhost_realtime(std::string vhost);
        virtual int on_reload_vhost_p1stpt(std::string vhost);
        virtual int on_reload_vhost_pnt(std::string vhost);
        virtual int on_reload_vhost_chunk_size(std::string vhost);
        virtual int on_reload_vhost_transcode(std::string vhost);
        virtual int on_reload_ingest_removed(std::string vhost, std::string ingest_id);
        virtual int on_reload_ingest_added(std::string vhost, std::string ingest_id);
        virtual int on_reload_ingest_updated(std::string vhost, std::string ingest_id);
        virtual int on_reload_user_info();
    };
    

    1.3 ISrsSourceHandler 类定义

    /**
     * the handler to handle the event of srs source.
     * for example, the http flv streaming module handle the event and 
     * mount http when rtmp start publishing.
     */
    class ISrsSourceHandler
    {
    public:
        ISrsSourceHandler();
        virtual ~ISrsSourceHandler();
    public:
        /**
        * when stream start publish, mount stream.
        */
        virtual int on_publish(SrsSource* s, SrsRequest* r) = 0;
        /**
        * when stream stop publish, unmount stream.
        */
        virtual void on_unpublish(SrsSource* s, SrsRequest* r) = 0;
    };
    

    1.4 IConnectionManager

    /**
     * the manager for connection.
     */
    class IConnectionManager
    {
    public:
        IConnectionManager();
        virtual ~IConnectionManager();
    public:
        /**
         * remove the specified connection.
         */
        virtual void remove(SrsConnection* c) = 0;
    };
    
    SrsServer 和 ISrsReloadHandler、ISrsSourceHandler 以及 IConnectionManager 之间的关系(继承)

    2. int SrsServer::cycle

    位于 srs_app_server.cpp:

    int SrsServer::cycle()
    {
        int ret = ERROR_SUCCESS;
        
        /* 开始进入 SRS 的主循环 */
        ret = do_cycle();
        
    #ifdef SRS_AUTO_GPERF_MC
        destroy();
        
        // remark, for gmc, never invoke the exit().
        srs_warn("sleep a long time for system st-threads to cleanup.");
        st_usleep(3 * 1000 * 1000);
        srs_warn("system quit");
    #else
        // normally quit with neccessary cleanup by dispost().
        srs_warn("main cycle terminated, system quit normally.");
        dispose();
        srs_trace("srs terminated");
        
        // for valgrind to detect.
        srs_freep(_srs_config);
        srs_freep(_srs_log);
        
        exit(0);
    #endif
    
        return ret;
    }
    

    2.1 SrsServer::do_cycle

    int SrsServer::do_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        // find the max loop
        int max = srs_max(0, SRS_SYS_TIME_RESOLUTION_MS_TIMES);
    
    #ifdef SRS_AUTO_STAT
        max = srs_max(max, SRS_SYS_RUSAGE_RESOLUTION_TIMES);
        max = srs_max(max, SRS_SYS_CPU_STAT_RESOLUTION_TIMES);
        max = srs_max(max, SRS_SYS_DISK_STAT_RESOLUTION_TIMES);
        max = srs_max(max, SRS_SYS_MEMINFO_RESOLUTION_TIMES);
        max = srs_max(max, SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES);
        max = srs_max(max, SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES);
        max = srs_max(max, SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES);
    #endif
        
        // for asprocess.
        bool asprocess = _srs_config->get_asprocess();
        
        // the daemon thread, update the time cache
        while (true) {
            /* 这里 handler 初始化为 NULL,所以忽略 */
            if (handler && (ret = handler->on_cycle((int)conns.size())) != ERROR_SUCCESS) {
                srs_error("cycle handle failed. ret=%d", ret);
                return ret;
            }
            
            // the interval in config.
            int heartbeat_max_resolution = (int)(_srs_config->get_heartbeat_interval() 
                                           / SRS_SYS_CYCLE_INTERVAL);
            
            // dynamic fetch the max.
            int temp_max = max;
            temp_max = srs_max(temp_max, heartbeat_max_resolution);
            
            for (int i = 0; i < temp_max; i++) {
                /* 主线程进入休眠,调度其他线程运行,比如调度 conn 线程,接收
                 * 客户端的连接 */
                st_usleep(SRS_SYS_CYCLE_INTERVAL * 1000);
                
                // asprocess check.
                if (asprocess && ::getppid() != ppid) {
                    srs_warn("asprocess ppid changed from %d to %d", ppid, ::getppid());
                    return ret;
                }
                
                // gracefully quit for SIGINT or SIGTERM.
                if (signal_gracefully_quit) {
                    srs_trace("cleanup for gracefully terminate");
                    return ret;
                }
                
                // for gperf heap checker,
                // @see: research/gperftools/heap-checker/heap_checker.cc
                // if user interrupt the program, exit to check mem leak.
                // but, if gperf, use reload to ensure main return normally,
                // because directly exit will cause core-dump.
    #ifdef SRS_AUTO_GPERF_MC
                if (signal_gmc_stop) {
                    srs_warn("gmc got singal to stop server.");
                    return ret;
                }
    #endif      
                
                // do reload the config.
                if (signal_reload) {
                    signal_reload = false;
                    srs_info("get signal reload, to reload the config.");
                    
                    if ((ret = _srs_config->reload()) != ERROR_SUCCESS) {
                        srs_error("reload config failed. ret=%d", ret);
                        return ret;
                    }
                    srs_trace("reload config success.");
                }
                
                // notice the stream sources to cycle.
                if ((ret = SrsSource::cycle_all()) != ERROR_SUCCESS) {
                    return ret;
                }
                
                // update the cache time
                if ((i % SRS_SYS_TIME_RESOLUTION_MS_TIMES) == 0) {
                    srs_info("update current time cache.");
                    srs_update_system_time_ms();
                }
                
    #ifdef SRS_AUTO_STAT
                if ((i % SRS_SYS_RUSAGE_RESOLUTION_TIMES) == 0) {
                    srs_info("update resource info, rss.");
                    srs_update_system_rusage();
                }
                if ((i % SRS_SYS_CPU_STAT_RESOLUTION_TIMES) == 0) {
                    srs_info("update cpu info, cpu usage.");
                    srs_update_proc_stat();
                }
                if ((i % SRS_SYS_DISK_STAT_RESOLUTION_TIMES) == 0) {
                    srs_info("update disk info, disk iops.");
                    srs_update_disk_stat();
                }
                if ((i % SRS_SYS_MEMINFO_RESOLUTION_TIMES) == 0) {
                    srs_info("update memory info, usage/free.");
                    srs_update_meminfo();
                }
                if ((i % SRS_SYS_PLATFORM_INFO_RESOLUTION_TIMES) == 0) {
                    srs_info("update platform info, uptime/load.");
                    srs_update_platform_info();
                }
                if ((i % SRS_SYS_NETWORK_DEVICE_RESOLUTION_TIMES) == 0) {
                    srs_info("update network devices info.");
                    srs_update_network_devices();
                }
                if ((i % SRS_SYS_NETWORK_RTMP_SERVER_RESOLUTION_TIMES) == 0) {
                    srs_info("update network server kbps info.");
                    resample_kbps();
                }
        #ifdef SRS_AUTO_HTTP_CORE
                if (_srs_config->get_heartbeat_enabled()) {
                    if ((i % heartbeat_max_resolution) == 0) {
                        srs_info("do http heartbeat, for internal server to report.");
                        http_heartbeat->heartbeat();
                    }
                }
        #endif
    #endif
                
                srs_info("server main thread loop");
            }
        }
    }
    

    在该函数中,该主线程循环进行休眠,以便调度其他线程运行。

  • 相关阅读:
    设计模式之工厂模式
    在线预览插件pdf.js使用记录
    自学Python:自定义模块导入问题
    MVC流程
    关于django的一些基础知识
    day72 关于rbac组件的小部分面试题
    linux的简单操作和安装
    day71 菜单的排序 点击被选中
    day063 form 和modelform组件
    day051 django第二天 django初识代码
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9055886.html
Copyright © 2011-2022 走看看