zoukankan      html  css  js  c++  java
  • SRS之RTMP连接处理线程conn:接收客户端推流

    SRS之RTMP的TCP线程 分析可知,SRS 接受客户端的连接后创建了一个线程:conn,用于处理与客户端的 RTMP 连接。

    本文的分析是基于该配置文件的:

    listen              1935;
    max_connections     1000;
    daemon              off;
    srs_log_tank        console;
    vhost __defaultVhost__ {
    }
    

    该配置文件仅使能 rtmp 直播推流功能。

    1. 关系图

    2. RTMP 连接处理线程 conn 之主循环

    2.1 conn 的线程函数: SrsThread::thread_fun

    void* SrsThread::thread_fun(void* arg)
    {
        SrsThread* obj = (SrsThread*)arg;
        srs_assert(obj);
        
        /* 这里进入线程的主循环开始处理与客户端的请求 */
        obj->thread_cycle();
        
        // for valgrind to detect.
        SrsThreadContext* ctx = dynamic_cast<SrsThreadContext*>(_srs_context);
        if (ctx) {
            ctx->clear_cid();
        }
        
        st_thread_exit(NULL);
        
        return NULL;
    }
    

    2.2 线程主循环:SrsThread::thread_cycle

    void SrsThread::thread_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        _srs_context->generate_id();
        srs_info("thread %s cycle start", _name);
        
        _cid = _srs_context->get_id();
        
        srs_assert(handler);
        /* handler 是指向 ISrsThreadHandler 类对象的指针,由图 1 知,
         * handler 指向子类 SrsOneCycleThread 对象的 this 指针,因此,
         * 这里调用的是 SrsOneCycleThread 的 on_thread_start 函数 */
        handler->on_thread_start();
        
        // thread is running now.
        really_terminated = false;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (!can_run && loop) {
            st_usleep(10 * 1000);
        }
        
        while (loop) {
            /* 调用子类 SrsOneCycleThread 的 on_before_cycle 函数 */
            if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", 
                         _name, ret);
                goto failed;
            }
            srs_info("thread %s on before cycle success", _name);
            
            /* 这里真正开始处理与客户端的 RTMP 通信 */
            if ((ret = handler->cycle()) != ERROR_SUCCESS) {
                if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) 
                {
                    srs_warn("thread %s cycle failed, ignored and retry, ret=%d", _name, ret);
                }
                goto failed;
            }
            srs_info("thread %s cycle success", _name);
            
            if ((ret = handler->on_end_cycle()) != ERROR_SUCCESS) {
                srs_warn("thread %s on end cycle failed, ignored and retry, ret=%d", 
                         _name, ret);
                goto failed;
            }
            srs_info("thread %s on end cycle success", _name);
            
        failed:
            if (!loop) {
                break;
            }
            
            // to improve performance, donot sleep when interval is zero.
            // @see: https://github.com/ossrs/srs/issues/237
            if (cycle_interval_us != 0) {
                st_usleep(cycle_interval_us);
            }
        }
        
        // readly terminated now.
        really_terminated = true;
        
        handler->on_thread_stop();
        srs_info("thread %s cycle finished", _name);
    }
    

    2.3 SrsOneCycleThread::on_thread_start

    void SrsOneCycleThread::on_thread_start()
    {
        /* handler 是 ISrsOneCycleThreadHandler 类的指针,该类的子类为 
         * SrsConnection,但是该类没有实现 on_thread_start 函数,因此
         * 仍然调用父类的 on_thread_start */
        handler->on_thread_start();
    }
    
    void ISrsOneCycleThreadHandler::on_thread_start()
    {
    }
    

    2.4 SrsOneCycleThread::on_before_cycle

    int SrsOneCycleThread::on_before_cycle()
    {
        /* 子类 SrsConnection 没有实现 on_before_cycle 函数,因此
         * 调用父类 ISrsOneCycleThreadHandler 的 on_before_cycle */
        return handler->on_before_cycle();
    }
    
    int ISrsOneCycleThreadHandler::on_before_cycle()
    {
        return ERROR_SUCCESS;
    }
    

    3. RTMP 事件处理循环

    3.1 SrsOneCycleThread::cycle

    int SrsOneCycleThread::cycle()
    {
        /* 调用子类 SrsConnection 实现的 cycle() 函数 */
        int ret = handler->cycle();
        pthread->stop_loop();
        return ret;
    }
    

    3.2 SrsConnection::cycle

    int SrsConnection::cycle()
    {
        int ret = ERROR_SUCCESS;
        
        _srs_context->generate_id();
        id = _srs_context->get_id();
        
        /* 获取客户端的 ip 地址 */
        ip = srs_get_peer_ip(st_netfd_fileno(stfd));
        
        /* 调用子类的 SrsRtmpConn 实现的 do_cycle 函数开始进行 RTMP 连接请求处理 */
        ret = do_cycle();
        
        // if socket io error, set to closed.
        if (srs_is_client_gracefully_close(ret)) {
            ret = ERROR_SOCKET_CLOSED;
        }
        
        // sucess.
        if (ret == ERROR_SUCCESS) {
            srs_trace("client finished.");
        }
        
        // client close peer.
        if (ret == ERROR_SOCKET_CLOSED) {
            srs_warn("client disconnect peer. ret=%d", ret);
        }
        
        return ERROR_SUCCESS;
    }
    

    SrsRtmpConn、SrsRtmpServer 和 SrsProtocol 之间的关系图

    3.3 SrsRtmpConn::do_cycle

    位于 srs_app_rtmp_conn.cpp。

    // the timeout to send data to client,
    // if timeout, close the connection.
    #define SRS_CONSTS_RTMP_SEND_TIMEOUT_US (int64_t)(30*1000*1000LL)
    
    // the timeout to wait client data,
    // if timeout, close the connection.
    #define SRS_CONSTS_RTMP_RECV_TIMEOUT_US (int64_t)(30*1000*1000LL)
    
    
    
    // TODO: return detail message when error for client.
    int SrsRtmpConn::do_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        srs_trace("RTMP client ip=%s", ip.c_str());
        
        /* 设置 rtmp 发送/接收数据时的超时时间 */
        rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
        rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
        
        /* 开始进行 RTMP 连接的 handshake 过程 */
        if ((ret = rtmp->handshake()) != ERROR_SUCCESS) {
            srs_error("rtmp handshake failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("rtmp handshake success");
        
        /* handshake 成功后,开始接收并解析客户端发送的第一个 RTMP 消息:connect,该消息中
         * 指明了客户端想要连接的 app,解析信息保存在 req 中 */
        if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) {
            srs_error("rtmp connect vhost/app failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("rtmp connect app success");
        
        // set client ip to request.
        req->ip = ip;
        
        /* 如果在配置中没有找到与 req->vhost 相同的,则使用默认的 vhost,
         * 即 __defaultVhost__  */
        // discovery vhost, resolve the vhost from config
        SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
        if (parsed_vhost) {
            req->vhost = parsed_vhost->arg0();
        }
        
        srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s", 
                 req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str());
        
        if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) 
        {
            ret = ERROR_RTMP_REQ_TCUTL;
            srs_error("discovery tcUrl failed. "
                "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",
                req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), 
                req->port.c_str(), req->app.c_str(), ret);
            return ret;
        }
        
        // check vhost
        if ((ret = check_vhost()) != ERROR_SUCCESS) {
            srs_error("check vhost failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("check vhost success.");
        
        srs_trace("connect app, "
            "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", 
            req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), 
            req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),
            req->app.c_str(), (req->args? "(obj)":"null"));
        
        // show client identity
        if (req->args) {
            std::string srs_version;
            std::string srs_server_ip;
            int srs_pid = 0;
            int srs_id = 0;
            
            SrsAmf0Any* prop = NULL;
            if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
                srs_version = prop->to_str();
            }
            if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
                srs_server_ip = prop->to_str();
            }
            if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
                srs_pid = (int)prop->to_number();
            }
            if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
                srs_id = (int)prop->to_number();
            }
            
            srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
            if (srs_pid > 0) {
                srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", 
                    srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
            }
        }
        
        ret = service_cycle();
        
        http_hooks_on_close();
        
        return ret;
    }
    

    3.5 rtmp->handshake()

    成功 accept 客户端的连接请求,开始与客户端进行 handshake 过程。具体可以参考 SRS之RTMP handshake

    3.6 rtmp->connect_app

    handshake 成功后,开始接收并解析客户端发送的第一个 RTMP 命令消息:connect。该消息指明了客户端想要连接服务器的某个 application。具体分析过程如 SRS之SrsRtmpServer::connect_app详解

    3.7 SrsRtmpConn::check_vhost

    int SrsRtmpConn::check_vhost()
    {
        int ret = ERROR_SUCCESS;
        
        srs_assert(req != NULL);
        
        /* 获取配置文件中 vhost 配置块的内容 */
        SrsConfDirective* vhost = _srs_config->get_vhost(req->vhost);
        if (vhost == NULL) {
            ret = ERROR_RTMP_VHOST_NOT_FOUND;
            srs_error("vhost %s not found. ret=%d", req->vhost.c_str(), ret);
            return ret;
        }
        
        /* 若 vhost 配置块为空,则默认使能 */
        if (!_srs_config->get_vhost_enabled(req->vhost)) {
            ret = ERROR_RTMP_VHOST_NOT_FOUND;
            srs_error("vhost %s disabled. ret=%d", req->vhost.c_str(), ret);
            return ret;
        }
        
        if (req->vhost != vhost->arg0()) {
            srs_trace("vhost change from %s to %s", req->vhost.c_str(), vhost->arg0().c_str());
            req->vhost = vhost->arg0();
        }
        
        if ((ret = refer->check(req->pageUrl, _srs_config->get_refer(req->vhost))) != ERROR_SUCCESS) {
            srs_error("check refer failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("check refer success.");
        
        if ((ret = http_hooks_on_connect()) != ERROR_SUCCESS) {
            return ret;
        }
        
        return ret;
    }
    

    这里暂不具体分析,因为配置文件 vhost 为 defaultVhost,即没有使能相关功能。

    3.8 SrsRtmpConn::service_cycle

    当检测客户端发送的 connect 有效的时候,开始调用该函数服务 client。具体分析过程见 SrsRtmpConn::service_cycle

  • 相关阅读:
    1-7周成绩总结
    1-6周成绩总结
    第七周学习笔记
    前五周测验成绩总结
    第六周学习笔记
    第五周学习笔记
    2018-2019-2 20189206 Python3学习
    2018-2019-2 20189206 安全工具的学习
    2018-2019-2 20189206 《网络攻防实践》 第一周作业
    2018-2019-1 20189206 《Linux内核原理与分析》第九周作业
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9069562.html
Copyright © 2011-2022 走看看