zoukankan      html  css  js  c++  java
  • SRS之HLS部署实例源码分析

    1. 综述

    SRS 关于 HLS 的具体配置可见: HLS部署实例

    SRS 关于 hls 的配置文件内容如下:

    listen              1935;
    max_connections     1000;
    daemon              off;
    srs_log_tank        console;
    vhost __defaultVhost__ {
        hls {
            enabled         on;
            hls_fragment    10;
            hls_window      60;
            hls_path        ./objs/nginx/html;
            hls_m3u8_file   [app]/[stream].m3u8;
            hls_ts_file     [app]/[stream]-[seq].ts;
        }
    }
    

    SRS 端具体流程分析如下:

    1. SRS 建立对 1935 端口的监听: SRS之监听端口的管理:RTMP
    2. 在建立对 1935 端口监听的过程中创建了针对该端口的 tcp 监听线程,在该线程中 accept 客户端的连接请求,接着针对该客户端创建一个 conn 线程,以便为该客户端提供服务: SRS之RTMP连接处理线程conn:接收客户端推流
    3. 在 conn 线程的循环开始时,首先服务器会与客户端进行 handshake 过程: SRS之RTMP handshake
    4. handshake 成功后,会接收客户端 handshake 后的第一个命令,一般为 connect('xxx'): SRS之SrsRtmpServer::connect_app详解
    5. 接着进入 SrsRtmpConn::service_cycle 函数: SRS之SrsRtmpConn::service_cycle详解
    6. 在 SrsRtmpConn::service_cycle 函数开始向客户端发送 应答窗口大小(5)、设置流带宽(6)、设置块大小(1)、response connect 的响应等消息后,开始进入循环,此时,调用 stream_service_cycle 函数

    对于 hls 的分析,下面开始从 stream_service_cycle 开始分析。

    2. SrsRtmpConn::stream_service_cycle

    int SrsRtmpConn::stream_service_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        SrsRtmpConnType type;
        /* 接收一些消息,然后根据该消息的内容识别该客户端接下来是 publish 还是 play */
        if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) 
            != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("identify client failed. ret=%d", ret);
            }
            return ret;
        }
        req->strip();
        
        /* security check: 若 vhost 中没有配置 security,则默认返回 false  */ 
        if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
            srs_error("security check failed. ret=%d", ret);
            return ret;
        }
        
        /* SRS 不支持流名称为空的请求,因为对于 HLS 需要通过流名称写入到文件中 */
        if (req->stream.empty()) {
            ret = ERROR_RTMP_STREAM_NAME_EMPTY;
            srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
            return ret;
        }
        
        /* client is identified, set the timeout to service timeout. */
        rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
        rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
        
        /* find a source to serve. */
        SrsSource* source = NULL;
        if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) 
        {
            return ret;
        }
        srs_assert(source != NULL);
        
        /* update the statistic when source disconveried. */
        SrsStatistic* stat = SrsStatistic::instance();
        if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) 
        {
            srs_error("stat client failed. ret=%d", ret);
            return ret;
        }
        
        /* 若 vhost 没有配置 mode,则默认返回 false */
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        /* 默认开启 gop_cache */
        bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
        source->set_cache(enabled_cache);
        
        client_type = type;
        switch (type) {
            ...
            
            case SrsRtmpConnFMLEPublish: {
                srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
                
                if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                    srs_error("start to publish stream failed. ret=%d", ret);
                    return ret;
                }
                
                return publishing(source);
            }
            
            ...
            
            default: {
                ret = ERROR_SYSTEM_CLIENT_INVALID;
                srs_info("invalid client type=%d. ret=%d", type, ret);
                return ret;
            }
        }
        
        return ret;
    }
    

    2.1 SrsSource::fetch_or_create

    /* 该全局变量 pool 用于保存创建的 SrsSource 类对象,
     * key: vhost/app/stream 
     * value: SrsSource* */
    std::map<std::string, SrsSource*> SrsSource::pool;
    
    /*
     * create source when fetch from cache failed.
     * @param r, the client request.
     * @param h, the event handlr for source.
     * @param pps, the matched source, if success never be NULL.
     */
    int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
    {
        int ret = ERROR_SUCCESS;
        
        SrsSource* source = NULL;
        /* 检测 pool 中是否有 key(如vhost/live/livestream)对应的 SrsSource*,
         * 若有则返回该 SrsSource* */
        if ((source = fetch(r)) != NULL) {
            *pps = source;
            return ret;
        }
        
        string stream_url = r->get_stream_url();
        string vhost = r->vhost;
        
        /* should always not exists for create a source. */
        srs_assert (pool.find(stream_url) == pool.end());
        
        /* 获取或构建一个新的 SrsSource */
        source = new SrsSource();
        if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
            srs_freep(source);
            return ret;
        }
        
        /* 将创建成功并初始化后 source 放入到 pool 中 */
        pool[stream_url] = source;
        
        *pps = source;
        
        return ret;
    }
    

    下面主要分析 SrsSource 的构造。

    2.1.1 SrsSource 构造

    /* live streaming source. */
    SrsSource::SrsSource()
    {
        /* deep copy of client request. */
        _req = NULL;
        /* the time jitter algorithm for vhost. 这里初始化关闭 */
        jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
        
        mix_correct = false;
        mix_queue = new SrsMixQueue();
        
    #ifdef SRS_AUTO_HLS
        /* 构造 SrsHls 类 */
        hls = new SrsHls();
    #endif
    ...
        
        cache_metadata = cache_sh_video = cache_sh_audio = NULL;
        
        /* can publish, true when is not streaming */
        _can_publish = true;
        _pre_source_id = _source_id = -1;
        /* last die time, when all consumers quit and no publisher,
         * we will remove the source when source die. */
        die_at = -1;
        
        /* edge control service */
        play_edge = new SrsPlayEdge();
        publish_edge = new SrsPublishEdge();
        /* gop cache for client fast startup. */
        gop_cache = new SrsGopCache();
        /* for aggregate message */
        aggregate_stream = new SrsStream();
        
        is_monotonically_increase = false;
        last_packet_time = 0;
        
        _srs_config->subscribe(this);
        atc = false;
    }
    

    2.1.2 SrsHls 构造

    /*
     * delivery RTMP stream to HLS(m3u8 and ts), 
     * SrsHls provides interface with SrsSource. 
     */
    SrsHls::SrsHls()
    {
        _req = NULL;
        source = NULL;
        
        /* hls 使能标志 */
        hls_enabled = false;
        /* hls 清理的过期时间,系统重启或者超过这个时间时,清理 hls 所有文件,
         * 包括 m3u8 文件和 ts。默认为 0,不清理 */
        hls_can_dispose = false;
        last_update_time = 0;
    
        /* the h264/avc and aac codec, for media stream. */
        codec = new SrsAvcAacCodec();
        /* the samples in the flv audio/video packet. */
        sample = new SrsCodecSample();
        /* time jitter detect and correct, to ensure the rtmp stream is monotonically. */
        jitter = new SrsRtmpJitter();
        
        /* muxer the HLS stream(m3u8 and ts files). */
        muxer = new SrsHlsMuxer();
        /* hls stream cache, use to cache hls stream and flush to hls muxer. */
        hls_cache = new SrsHlsCache();
    
        pprint = SrsPithyPrint::create_hls();
        /*
         * we store the stream dts,
         * for when we notice the hls cache to publish,
         * it need to know the segment start dts.
         *
         * for example. when republish, the stream dts will
         * monotonically increase, and the ts dts should start 
         * from current dts.
         *
         * or, simply because the HlsCache never free when unpublish,
         * so when publish or republish it must start at stream dts,
         * not zero dts.
         */
        stream_dts = 0;
    }
    

    2.1.3 SrsAvcAacCodec 构造

    /**
    * the h264/avc and aac codec, for media stream.
    *
    * to demux the FLV/RTMP video/audio packet to sample,
    * add each NALUs of h.264 as a sample unit to sample,
    * while the entire aac raw data as a sample unit.
    *
    * for sequence header,
    * demux it and save it in the avc_extra_data and aac_extra_data,
    * 
    * for the codec info, such as audio sample rate,
    * decode from FLV/RTMP header, then use codec info in sequence 
    * header to override it.
    */
    SrsAvcAacCodec::SrsAvcAacCodec()
    {
        /* for sequence header, whether parse the h.264 sps. */
        avc_parse_sps               = true;
        
        width                       = 0;
        height                      = 0;
        duration                    = 0;
        NAL_unit_length             = 0;
        frame_rate                  = 0;
    
        video_data_rate             = 0;
        video_codec_id              = 0;
    
        audio_data_rate             = 0;
        audio_codec_id              = 0;
    
        avc_profile                 = SrsAvcProfileReserved;
        avc_level                   = SrsAvcLevelReserved;
        aac_object                  = SrsAacObjectTypeReserved;
        aac_sample_rate             = SRS_AAC_SAMPLE_RATE_UNSET; // sample rate ignored
        aac_channels                = 0;
        avc_extra_size              = 0;
        avc_extra_data              = NULL;
        aac_extra_size              = 0;
        aac_extra_data              = NULL;
    
        sequenceParameterSetLength  = 0;
        sequenceParameterSetNALUnit = NULL;
        pictureParameterSetLength   = 0;
        pictureParameterSetNALUnit  = NULL;
    
        /* the avc payload format. */
        payload_format = SrsAvcPayloadFormatGuess;
        stream = new SrsStream();
    }
    

    2.1.4 SrsCodecSample 构造

    /**
    * the samples in the flv audio/video packet.
    * the sample used to analysis a video/audio packet,
    * split the h.264 NALUs to buffers, or aac raw data to a buffer,
    * and decode the video/audio specified infos.
    * 
    * the sample unit:
    *       a video packet codec in h.264 contains many NALUs, each is a sample unit.
    *       a audio packet codec in aac is a sample unit.
    * @remark, the video/audio sequence header is not sample unit,
    *       all sequence header stores as extra data, 
    *       @see SrsAvcAacCodec.avc_extra_data and SrsAvcAacCodec.aac_extra_data
    * @remark, user must clear all samples before decode a new video/audio packet.
    */
    SrsCodecSample::SrsCodecSample()
    {
        clear();
    }
    
    /**
     * clear all samples.
     * the sample units never copy the bytes, it directly use the ptr,
     * so when video/audio packet is destroyed, the sample must be clear.
     * in a word, user must clear sample before demux it.
     * @remark demux sample use SrsAvcAacCodec.audio_aac_demux or video_avc_demux.
     */
    void SrsCodecSample::clear()
    {
        /* whether the sample is video sample which demux from video packet. */
        is_video = false;
        /*
         * each audio/video raw data packet will dumps to one or multiple buffers,
         * the buffers will write to hls and clear to reset.
         * generally, aac audio packet corresponding to one buffer,
         * where avc/h264 video packet may contains multiple buffer.
         */
        nb_sample_units = 0;
    
        /* 
         * CompositionTime, video_file_format_spec_v10_1.pdf, page 78.
         * cts = pts - dts, where dts = flvheader->timestamp.
         */
        cts = 0;
        frame_type = SrsCodecVideoAVCFrameReserved;
        avc_packet_type = SrsCodecVideoAVCTypeReserved;
        has_sps_pps = has_aud = has_idr = false;
        first_nalu_type = SrsAvcNaluTypeReserved;
        
        acodec = SrsCodecAudioReserved1;
        sound_rate = SrsCodecAudioSampleRateReserved;
        sound_size = SrsCodecAudioSampleSizeReserved;
        sound_type = SrsCodecAudioSoundTypeReserved;
        aac_packet_type = SrsCodecAudioTypeReserved;
    }
    

    2.1.5 SrsHlsMuxer 构造

    /**
    * muxer the HLS stream(m3u8 and ts files).
    * generally, the m3u8 muxer only provides methods to open/close segments,
    * to flush video/audio, without any mechenisms.
    * 
    * that is, user must use HlsCache, which will control the methods of muxer,
    * and provides HLS mechenisms.
    */
    SrsHlsMuxer::SrsHlsMuxer()
    {
        req = NULL;
        hls_fragment = hls_window = 0;
        hls_aof_ratio = 1.0;
        /*
         * the deviation in piece to adjust the fragment to be more
         * bigger or smaller.
         */
        deviation_ts = 0;
        /* 是否删除过期的 ts 切片,不在 hls_window 中就是过期。
         * 可以关闭清除 ts 切片,实现时移和存储,使用自己的切片管理系统。
         * 这里默认为开启 */
        hls_cleanup = true;
        /* 是否按 gop 切片,即等待到关键帧后开始切片。这里默认开启 */
        hls_wait_keyframe = true;
        previous_floor_ts = 0;
        /*
         * the previous reap floor timestamp,
         * used to detect the dup or jmp or ts.
         */
        accept_floor_ts = 0;
        /* whether use floor algorithm for timestamp. */
        hls_ts_floor = false;
        /* 切片时长的最大值 */
        max_td = 0;
        /* ts 的序列号,每开始写一个新的 ts,该值加 1 */
        _sequence_no = 0;
        /* current writing segment. */
        current = NULL;
        /*
         * the current audio codec, when open new muxer, 
         * set the muxer audio codec.
         * @see https://github.com/ossrs/srs/issues/301
         */
        acodec = SrsCodecAudioReserved1;
        /* 默认不支持写入到 cache 中 */
        should_write_cache = false;
        /* 默认开启将 video/audio 写入到文件中,即 ts 文件 */
        should_write_file = true;
        /* 构建了一个可重复使用(SrsReusableThread)的线程:async */
        async = new SrsAsyncCallWorker();
        /* the ts context, to keep cc continous between ts. */
        context = new SrsTsContext();
    }
    

    2.1.6 SrsAsyncCallWorker 构造

    /**
     * the async callback for dvr.
     * when worker call with the task, the worker will do it in isolate thread.
     * that is, the task is execute/call in async mode.
     */
    SrsAsyncCallWorker::SrsAsyncCallWorker()
    {
        /* 创建一个可重复使用的线程:async */
        pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
    }
    

    2.1.6.1 SrsReusableThread 构造

    SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, 
        int64_t interval_us)
    {
        handler = h;
        pthread = new internal::SrsThread(n, this, interval_us, true);
    }
    

    2.1.6.2 SrsThread 构造

    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
    {
        _name = name;
        handler = thread_handler;
        cycle_interval_us = interval_us;
        
        tid = NULL;
        loop = false;
        really_terminated = true;
        _cid = -1;
        _joinable = joinable;
        disposed = false;
        
        // in start(), the thread cycle method maybe stop and remove the thread itself,
        // and the thread start() is waiting for the _cid, and segment fault then.
        // @see https://github.com/ossrs/srs/issues/110
        // thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
        can_run = false;
    }
    

    2.1.7 SrsHlsCache 构造

    /**
    * hls stream cache, 
    * use to cache hls stream and flush to hls muxer.
    * 
    * when write stream to ts file:
    * video frame will directly flush to M3u8Muxer,
    * audio frame need to cache, because it's small and flv tbn problem.
    * 
    * whatever, the Hls cache used to cache video/audio,
    * and flush video/audio to m3u8 muxer if needed.
    * 
    * about the flv tbn problem:
    *   flv tbn is 1/1000, ts tbn is 1/90000,
    *   when timestamp convert to flv tbn, it will loose precise,
    *   so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter,
    *   we use a aac jitter to correct the audio pts.
    */
    SrsHlsCache::SrsHlsCache()
    {
        cache = new SrsTsCache();
    }
    

    2.1.8 SrsTsCache 构造

    /**
    * ts stream cache, 
    * use to cache ts stream.
    * 
    * about the flv tbn problem:
    *   flv tbn is 1/1000, ts tbn is 1/90000,
    *   when timestamp convert to flv tbn, it will loose precise,
    *   so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter,
    *   we use a aac jitter to correct the audio pts.
    */
    SrsTsCache::SrsTsCache()
    {
        /* current ts message. */
        audio = NULL;
        video = NULL;
    }
    

    上面构造好 SrsSource 后,接着调用 SrsSource 的 initialize 函数进行初始化。

    2.1.9 SrsSource::initialize

    int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
    {
        int ret = ERROR_SUCCESS;
        
        srs_assert(h);
        srs_assert(!_req);
    
        /* the event handler. */
        handler = h;
        /* deep copy of client request. */
        _req = r->copy();
        /* 若 vhost 中没有配置 atc,默认返回 false */
        atc = _srs_config->get_atc(_req->vhost);
    
    #ifdef SRS_AUTO_HLS
        if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
            return ret;
        }
    #endif
        
    ..
    
        if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
        if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
        
        /* 若 vhost 中没有配置 queue_length,则默认播放队列的大小为 30s
         *
         * in seconds, the live queue length
         * #define SRS_PERF_PLAY_QUEUE 30
         */
        double queue_size = _srs_config->get_queue_length(_req->vhost);
        publish_edge->set_queue_size(queue_size);
        
        /* the time jitter algorithm for vhost.
         * 若 vhost 中没有配置 time_jitter,则默认返回值为 SrsRtmpJitterAlgorithmFULL */
        jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
        /* 若 vhost 中没有配置 mix_correct,则默认返回 false,
         * 即禁止使用 interlaved/mixed algorithm 去校正 timstamp */
        mix_correct = _srs_config->get_mix_correct(_req->vhost);
        
        return ret;
    }
    

    2.1.10 SrsHls::initialize

    /*
     * initialize the hls by handler and source.  
     */
    int SrsHls::initialize(SrsSource* s)
    {
        int ret = ERROR_SUCCESS;
    
        source = s;
    
        /* 初始化 hls muxer */
        if ((ret = muxer->initialize()) != ERROR_SUCCESS) {
            return ret;
        }
    
        return ret;
    }
    

    2.1.11 SrsHlsMuxer::initialize

    /* initialize the hls muxer. */
    int SrsHlsMuxer::initialize()
    {
        int ret = ERROR_SUCCESS;
        
        /* 启动 async 线程 */
        if ((ret = async->start()) != ERROR_SUCCESS) {
            return ret;
        }
        
        return ret;
    }
    

    2.1.12 SrsAsyncCallWorker::start

    int SrsAsyncCallWorker::start()
    {
        return pthread->start();
    }
    

    2.1.13 SrsReusableThread::start

    int SrsReusableThread::start()
    {
        return pthread->start();
    }
    

    2.1.14 SrsThread::start

    int SrsThread::start()
    {
        int ret = ERROR_SUCCESS;
        
        if(tid) {
            srs_info("thread %s already running.", _name);
            return ret;
        }
        
        /* 创建线程 */
        if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
            ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
        }
        
        disposed = false;
        // we set to loop to true for thread to run.
        loop = true;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (_cid < 0) {
            st_usleep(10 * 1000);
        }
        
        // now, cycle thread can run.
        can_run = true;
        
        return ret;
    }
    

    3. SrsRtmpConn::publishing

    int SrsRtmpConn::publishing(SrsSource* source)
    {
        int ret = ERROR_SUCCESS;
        
        /* vhost 中没有配置 refer_publish,忽略 */
        if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) 
            != ERROR_SUCCESS) {
            srs_error("check publish_refer failed. ret=%d", ret);
            return ret;
        }
        
        /* vhost 中没有配置 http_hooks,ignore */
        if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
            srs_error("http hook on_publish failed. ret=%d", ret);
            return ret;
        }
        
        /* 这里返回 false */
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
            SrsPublishRecvThread tcp(rtmp, req, 
                st_netfd_fileno(stfd), 0, this, source,
                client_type != SrsRtmpConnFlashPublish, 
                vhost_is_edge);
            
            ret = do_publishing(source, &trd);
            
            /* stop isolate recv thread */
            trd.stop();
        }
        
        /*
         * whatever the acquire publish, always release publish.
         * when the acquire error in the middle-way, the publish state changed,
         * but failed, so we must cleanup it.
         * @remark, when stream is busy, should never release it.
         */
        if (ret != ERROR_SYSTEM_STREAM_BUSY) {
            release_publish(source, vhost_is_edge);
        }
    
        http_hooks_on_unpublish();
    
        return ret;
    }
    

    3.1 SrsRtmpConn::acquire_publish

    int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
    {
        int ret = ERROR_SUCCESS;
    
        /* 这里直接返回 _can_publish 的值,source  构建时初始化为 true */
        if (!source->can_publish(is_edge)) {
            ret = ERROR_SYSTEM_STREAM_BUSY;
            srs_warn("stream %s is already publishing. ret=%d", 
                req->get_stream_url().c_str(), ret);
            return ret;
        }
        
        /* vhost 中没有配置 mode,因此 is_edge 为 false */
        /* when edge, ignore the publish event, directly proxy it. */
        if (is_edge) {
            if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
                srs_error("notice edge start publish stream failed. ret=%d", ret);
                return ret;
            }        
        } else {
            /* publish stream event notify. */
            if ((ret = source->on_publish()) != ERROR_SUCCESS) {
                srs_error("notify publish failed. ret=%d", ret);
                return ret;
            }
        }
    
        return ret;
    }
    

    3.1.1 SrsSource::on_publish

    /*
     * publish stream event notify.
     * @param _req, the request from client, the source will deep copy it,
     *     for when reload the request of client maybe invalid.
     */
    int SrsSource::on_publish()
    {
        int ret = ERROR_SUCCESS;
        
        /* update the request object. */
        srs_assert(_req);
        
        _can_publish = false;
        
        /* whatever, the publish thread is the source or edge source,
         * save its id to srouce id. */
        on_source_id_changed(_srs_context->get_id());
        
        /* reset the mix queue. */
        mix_queue->clear();
        
        /* detect the monotonically again. */
        is_monotonically_increase = true;
        last_packet_time = 0;
        
        /* create forwarders */
        /* vhost 没有配置 forward,ignore */
        if ((ret = create_forwarders()) != ERROR_SUCCESS) {
            srs_error("create forwarders failed. ret=%d", ret);
            return ret;
        }
        
        /* TODO: FIXME: use initialize to set req. */
    #ifdef SRS_AUTO_TRANSCODE
        ...
    #endif
        
        /* TODO: FIXME: use initialize to set req. */
    #ifdef SRS_AUTO_HLS
        if ((ret = hls->on_publish(_req, false)) != ERROR_SUCCESS) {
            srs_error("start hls failed. ret=%d", ret);
            return ret;
        }
    #endif
        
        /* TODO: FIXME: use initialize to set req. */
    #ifdef SRS_AUTO_DVR
        ...
    #endif
    
    #ifdef SRS_AUTO_HDS
        ...
    #endif
    
        /* notify the handler. */
        srs_assert(handler);
        /* 调用子类 SrsServer 实现的 on_publish 函数,没有开启相关功能,ignore */
        if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
            srs_error("handle on publish failed. ret=%d", ret);
            return ret;
        }
        SrsStatistic* stat = SrsStatistic::instance();
        stat->on_stream_publish(_req, _source_id);
        
        return ret;
    }
    

    3.1.2 SrsSource::on_source_id_changed

    int SrsSource::on_source_id_changed(int id)
    {
        int ret = ERROR_SUCCESS;
        
        if (_source_id == id) {
            return ret;
        }
        
        if (_pre_source_id == -1) {
            _pre_source_id = id;
        } else if (_pre_source_id != _source_id) {
            _pre_source_id = _source_id;
        }
        
        _source_id = id;
        
        /* notice all consumer */
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            /* 该函数将 should_update_source_id 置为 true */
            consumer->update_source_id();
        }
        
        return ret;
    }
    

    3.1.3 SrsHls::on_publish

    /*
     * publish stream event, continue to write the m3u8,
     * for the muxer object not destroyed.
     * @param fetch_sequence_header, whether fetch sequence from source.
     */
    int SrsHls::on_publish(SrsRequest* req, bool fetch_sequence_header)
    {
        int ret = ERROR_SUCCESS;
        
        srs_freep(_req);
        _req = req->copy();
        
        /* update the hls time, for hls_dispose. */
        last_update_time = srs_get_system_time_ms();
        
        /* SrsHls 构建时初始化为 false,因此不支持多次 publish */
        /* support multiple publish. */
        if (hls_enabled) {
            return ret;
        }
        
        /* vhost 中使能了 hls */
        std::string vhost = req->vhost;
        if (!_srs_config->get_hls_enabled(vhost)) {
            return ret;
        }
        
        /* 初始化 muxer,创建 m3u8 目录和文件等等 */
        if ((ret = hls_cache->on_publish(muxer, req, stream_dts)) != ERROR_SUCCESS) {
            return ret;
        }
        
        /* if enabled, open the muxer. */
        hls_enabled = true;
        
        /* ok, the hls can be dispose, or need to be dispose. */
        hls_can_dispose = true;
        
        /* 由上面调用知,传入的值为 false */
        /* when publish, don't need to fetch sequence header, which is old and maybe corrupt.
         * when reload, we must fetch the sequence header from source cache. */
        if (fetch_sequence_header) {
            /* notice the source to get the cached sequence header.
             * when reload to start hls, hls will never get the sequence header in stream,
             * use the SrsSource.on_hls_start to push the sequence header to HLS. */
            if ((ret = source->on_hls_start()) != ERROR_SUCCESS) {
                srs_error("callback source hls start failed. ret=%d", ret);
                return ret;
            }
        }
        
        return ret;
    }
    

    3.1.4 SrsHlsCache::on_publish

    int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req, 
        int64_t segment_start_dts)
    {
        int ret = ERROR_SUCCESS;
    
        std::string vhost = req->vhost;
        std::string stream = req->stream;
        std::string app = req->app;
        
        /* get the hls fragment time, in seconds.
         * 配置文件中设置 hls_fragment 为 10,即切片时长的最小值为 10s */
        double hls_fragment = _srs_config->get_hls_fragment(vhost);
       
        /* get the hls window time, in seconds.
         * a window is a set of ts, the ts collection in m3u8.
         * @remark SRS will delete the ts exceed the window.
         * 配置文件中设置 hls_window 为 60,即所有切片加起来的最大时长为 60s */
        double hls_window = _srs_config->get_hls_window(vhost);
        
        /* get the hls m3u8 ts list entry prefix config */
        /* get the HLS m3u8 list ts segment entry prefix info.
         * vhost 中没有配置 hls_entry_prefix,因此返回 "" */
        std::string entry_prefix = _srs_config->get_hls_entry_prefix(vhost);
        
        /* get the HLS ts/m3u8 file store path.
         * 配置文件中 hls_path 为 ./objs/nginx/html  */
        std::string path = _srs_config->get_hls_path(vhost);
        
        /* get the HLS m3u8 file path template.
         * 配置文件中 hls_m3u8_file 为 [app]/[stream].m3u8 */
        std::string m3u8_file = _srs_config->get_hls_m3u8_file(vhost);
       
        /* get the HLS ts file path template.
         * 配置文件中 hls_ts_file 为 [app]/[stream]-[seq].ts */
        std::string ts_file = _srs_config->get_hls_ts_file(vhost);
        
        /* whether cleanup the old ts files.
         * hls 中没有配置 hls_cleanup,因此默认返回 true
         * 即开启删除过期的 ts 切片功能,不在 hls_window 中就是过期 */
        bool cleanup = _srs_config->get_hls_cleanup(vhost);
        
        /* whether reap the ts when got keyframe.
         * hls 中没有配置 hls_wait_keyframe,默认返回 true,即表示等待 I帧
         * 因此需要该切片时长超过 10s,且获取到 I帧的情况下才开始切片 */
        bool wait_keyframe = _srs_config->get_hls_wait_keyframe(vhost);
        
        /* get the hls aof(audio overflow) ratio.
         * hls 没有配置 hls_aof_ratio,默认返回 2.0,
         * 纯音频下音频时长的倍数 */
        double hls_aof_ratio = _srs_config->get_hls_aof_ratio(vhost);
        
        /* whether use floor(timestamp/hls_fragment) for variable timestamp
         * hls 中没有配置 hls_ts_floor,默认返回 false*/
        bool ts_floor = _srs_config->get_hls_ts_floor(vhost);
        
        /* the seconds to dispose the hls.
         * hls 中没有配置 hls_dispose,默认返回 0 */
        int hls_dispose = _srs_config->get_hls_dispose(vhost);
        
        /* TODO: FIXME: support load exists m3u8, to continue publish stream.
         * for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase */
        
        /* open muxer, when publish, update the config for muxer */
        if ((ret = muxer->update_config(req, entry_prefix,
            path, m3u8_file, ts_file, hls_fragment, hls_window, ts_floor, hls_aof_ratio,
            cleanup, wait_keyframe)) != ERROR_SUCCESS
        ) {
            srs_error("m3u8 muxer update config failed. ret=%d", ret);
            return ret;
        }
        
        /* open a new segment(a new ts file) */
        if ((ret = muxer->segment_open(segment_start_dts)) != ERROR_SUCCESS) {
            srs_error("m3u8 muxer open segment failed. ret=%d", ret);
            return ret;
        }
        
        return ret;
    }
    

    3.1.5 SrsHlsMuxer::update_config

    /* when publish, update the config for muxer. */
    int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
        string path, string m3u8_file, string ts_file, double fragment, double window,
        bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe
    ) {
        int ret = ERROR_SUCCESS;
        
        srs_freep(req);
        /* 深度拷贝一个副本给 req */
        req = r->copy();
    
        /* "" */
        hls_entry_prefix = entry_prefix;
        /* hls_path: ./objs/nginx/html */
        hls_path = path;
        /* hls_ts_file: "[app]/[stream]-[seq].ts" */
        hls_ts_file = ts_file;
        /* hls_fragment: 10s */
        hls_fragment = fragment;
        /* hls_aof_ratio: 2.0 */
        hls_aof_ratio = aof_ratio;
        /* hls_ts_floor: false */
        hls_ts_floor = ts_floor;
        /* hls_cleanup: true */
        hls_cleanup = cleanup;
        /* hls_wait_keyframe: true */
        hls_wait_keyframe = wait_keyframe;
        previous_floor_ts = 0;
        accept_floor_ts = 0;
        /* hls_window: 60s */
        hls_window = window;
        deviation_ts = 0;
        
        /* generate the m3u8 dir and path. *
        /* m3u8_url: "live/livestream.m3u8" */
        m3u8_url = srs_path_build_stream(m3u8_file, req->vhost, req->app, req->stream);
        /* m3u8: ./objs/nginx/html/live/livestream.m3u8 */
        m3u8 = path + "/" + m3u8_url;
    
        /* when update config, reset the history target duration. */
        /* get_hls_td_ratio: get the hls td(target duration) ratio.
         * hls 没有配置 hls_td_ratio,默认返回 1.5,而 fragment 配置为 10,
         * 因此 max_td 为 15 */
        max_td = (int)(fragment * _srs_config->get_hls_td_ratio(r->vhost));
        
        /* TODO: FIXME: refine better for SRS2 only support disk. */
        should_write_cache = false;
        should_write_file = true;
        
        /* create m3u8 dir once. */
        /* 返回 m3u8 的目录路径,即 ./objs/nginx/html/live */
        m3u8_dir = srs_path_dirname(m3u8);
        if (should_write_file && (ret = srs_create_dir_recursively(m3u8_dir)) != ERROR_SUCCESS) 
        {
            srs_error("create app dir %s failed. ret=%d", m3u8_dir.c_str(), ret);
            return ret;
        }
        srs_info("create m3u8 dir %s ok", m3u8_dir.c_str());
        
        return ret;
    }
    

    3.1.6 srs_path_build_stream

    /*
     * build the path according to vhost/app/stream, where replace variables:
     *     [vhost], the vhost of stream.
     *     [app], the app of stream.
     *     [stream], the stream name of stream.
     * @return the replaced path.
     */
    string srs_path_build_stream(string template_path, string vhost, string app, string stream)
    {
        std::string path = template_path;
        
        /* variable [vhost] */
        path = srs_string_replace(path, "[vhost]", vhost);
        /* variable [app] */
        path = srs_string_replace(path, "[app]", app);
        /* variable [stream] */
        path = srs_string_replace(path, "[stream]", stream);
        
        return path;
    }
    

    3.1.7 SrsHlsMuxer::segment_open

    /*
     * open a new segment(a new ts file),
     * @param segment_start_dts, use to calc the segment duration,
     *    use 0 for the first segment of HLS.
     */
    int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
    {
        int ret = ERROR_SUCCESS;
        
        /* current: current writing segment. 初始化时为 NULL */
        if (current) {
            srs_warn("ignore the segment open, for segment is already open.");
            return ret;
        }
        
        /* when segment open, the current segment must be NULL. */
        srs_assert(!current);
        
        /* load the default acodec from config. */
        SrsCodecAudio default_acodec = SrsCodecAudioAAC;
        if (true) {
            /* hls 没有配置 hls_acodec,默认返回 "aac" */
            std::string default_acodec_str = _srs_config->get_hls_acodec(req->vhost);
            if (default_acodec_str == "mp3") {
                default_acodec = SrsCodecAudioMP3;
                srs_info("hls: use default mp3 acodec");
            } else if (default_acodec_str == "aac") {
                default_acodec = SrsCodecAudioAAC;
                srs_info("hls: use default aac acodec");
            } else if (default_acodec_str == "an") {
                default_acodec = SrsCodecAudioDisabled;
                srs_info("hls: use default an acodec for pure video");
            } else {
                srs_warn("hls: use aac for other codec=%s", default_acodec_str.c_str());
            }
        }
        
        /* load the default vcodec from config. */
        SrsCodecVideo default_vcodec = SrsCodecVideoAVC;
        if (true) {
            /* hls 中没有配置 hls_vcodec,默认返回 "h264" */
            std::string default_vcodec_str = _srs_config->get_hls_vcodec(req->vhost);
            if (default_vcodec_str == "h264") {
                default_vcodec = SrsCodecVideoAVC;
                srs_info("hls: use default h264 vcodec");
            } else if (default_vcodec_str == "vn") {
                default_vcodec = SrsCodecVideoDisabled;
                srs_info("hls: use default vn vcodec for pure audio");
            } else {
                srs_warn("hls: use h264 for other codec=%s", default_vcodec_str.c_str());
            }
        }
        
        /* new segment. */
        current = new SrsHlsSegment(context, should_write_cache, should_write_file, 
                                    default_acodec, default_vcodec);
        /* _sequence_no: sequence number in m3u8. */
        current->sequence_no = _sequence_no++;
        /* 若为 HLS 的 first segment,则 segment_start_dts 为 0 */
        current->segment_start_dts = segment_start_dts;
        
        /* generate filename. */
        /* ts_file: "[app]/[stream]-[seq].ts" */
        std::string ts_file = hls_ts_file;
        /* ts_file: "live/livestream-[seq].ts" */
        ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream);
        /* SrsHlsMuxer 构造时,初始化该值为 false */
        if (hls_ts_floor) {
            ...
            
        } else {
            ts_file = srs_path_build_timestamp(ts_file);
        }
        if (true) {
            std::stringstream ss;
            ss << current->sequence_no;
            /* ts_file: live/livestream-0.ts */
            ts_file = srs_string_replace(ts_file, "[seq]", ss.str());
        }
        /* full_path: ./objs/nginx/html/live/livestream-0.ts */
        current->full_path = hls_path + "/" + ts_file;
        
        /* the ts url, relative or absolute url. */
        std::string ts_url = current->full_path;
        /* ts_url: ./objs/nginx/html/live/livestream-0.ts,
         * m3u8_dir: ./objs/nginx/html/live */
        if (srs_string_starts_with(ts_url, m3u8_dir)) {
            /* ts_url: /livestream-0.ts */
            ts_url = ts_url.substr(m3u8_dir.length());
        }
        while (srs_string_starts_with(ts_url, "/")) {
            /* ts_url: livestream-0.ts */
            ts_url = ts_url.substr(1);
        }
        /* current->uri: "" */
        current->uri += hls_entry_prefix;
        if (!hls_entry_prefix.empty() && !srs_string_ends_with(hls_entry_prefix, "/")) 
        {
            current->uri += "/";
            
            /* add the http dir to uri. */
            string http_dir = srs_path_dirname(m3u8_url);
            if (!http_dir.empty()) {
                current->uri += http_dir + "/";
            }
        }
        /* current->uri: livestream-0.ts */
        current->uri += ts_url;
        
        /* create dir recursively for hls. */
        /* ts_dir: ./objs/nginx/html/live */
        std::string ts_dir = srs_path_dirname(current->full_path);
        if (should_write_file && (ret = srs_create_dir_recursively(ts_dir)) 
            != ERROR_SUCCESS) {
            srs_error("create app dir %s failed. ret=%d", ts_dir.c_str(), ret);
            return ret;
        }
        
        /* open temp ts file. */
        /* tmp_file: ./objs/nginx/html/live/livestream-0.ts.tmp */
        std::string tmp_file = current->full_path + ".tmp";
        if ((ret = current->muxer->open(tmp_file.c_str())) != ERROR_SUCCESS) {
            srs_error("open hls muxer failed. ret=%d", ret);
            return ret;
        }
        
        /* set the segment muxer audio codec. */
        if (acodec != SrsCodecAudioReserved1) {
            current->muxer->update_acodec(acodec);
        }
        
        return ret;
    }
    

    3.1.8 SrsHlsSegment 构造

    /*
     * the wrapper of m3u8 segment from specification:
     * EXTINF: The EXTINF tag specifies the duration of a media segment.
     */
    SrsHlsSegment::SrsHlsSegment(SrsTsContext* c, bool write_cache, 
        bool write_file, SrsCodecAudio ac, SrsCodecVideo vc)
    {
        /* duration in seconds in m3u8. */
        duration = 0;
        /* sequence number in m3u8. */
        sequence_no = 0;
        /* current segment start dts for m3u8 */
        segment_start_dts = 0;
        /* whether current segement is sequence header. */
        is_sequence_header = false;
        /* 由前知 write_cache 为 false,write_file 为 true */
        writer = new SrsHlsCacheWriter(write_cache, write_file);
        muxer = new SrsTSMuxer(writer, c, ac, vc);
    }
    

    3.1.9 SrsHlsCacheWriter 构造

    /*
     * write to file and cache.
     */
    SrsHlsCacheWriter::SrsHlsCacheWriter(bool write_cache, bool write_file)
    {
        should_write_cache = write_cache;
        should_write_file = write_file;
    }
    

    3.1.10 SrsTSMuxer 构造

    /*
     * write data from frame(header info) and buffer(data) to ts file. 
     * it's a simple object wrapper for utility from nginx-rtmp: SrsMpegtsWriter
     */
    SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w, SrsTsContext* c, SrsCodecAudio ac, 
        SrsCodecVideo vc)
    {
        writer = w;
        context = c;
    
        acodec = ac;
        vcodec = vc;
    }
    

    3.1.11 srs_path_build_timestamp

    /*
     * build the path according to timestamp, where replace variables:
     *       [2006], replace this const to current year.
     *       [01], replace this const to current month.
     *       [02], replace this const to current date.
     *       [15], replace this const to current hour.
     *       [04], repleace this const to current minute.
     *       [05], repleace this const to current second.
     *       [999], repleace this const to current millisecond.
     *       [timestamp],replace this const to current UNIX timestamp in ms.
     * @return the replace path.
     */
    string srs_path_build_timestamp(string template_path)
    {
        std::string path = template_path;
        
        /* data and time substitude
         * clock time */
        timeval tv;
        if (gettimeofday(&tv, NULL) == -1) {
            return path;
        }
        
        /* to calendar time */
        struct tm* tm;
        /* 若没有配置 utc_time,则默认返回 false,即不使用 utc 时间 */
        if (_srs_config->get_utc_time()) {
            if ((tm = gmtime(&tv.tv_sec)) == NULL) {
                return path;
            }
        } else {
            if ((tm = localtime(&tv.tv_sec)) == NULL) {
                return path;
            }
        }
        
        /* the buffer to format the date and time. */
        char buf[64];
        
        /* [2006], replace with current year. */
        if (true) {
            snprintf(buf, sizeof(buf), "%04d", 1900 + tm->tm_year);
            path = srs_string_replace(path, "[2006]", buf);
        } 
        /* [01], replace this const to current month. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", 1 + tm->tm_mon);
            path = srs_string_replace(path, "[01]", buf);
        }
        /* [02], replace this const to current date. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_mday);
            path = srs_string_replace(path, "[02]", buf);
        }
        /* [15], replace this const to current hour. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_hour);
            path = srs_string_replace(path, "[15]", buf);
        }
        /* [04], repleace this const to current minute. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_min);
            path = srs_string_replace(path, "[04]", buf);
        }
        /* [05], repleace this const to current second. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_sec);
            path = srs_string_replace(path, "[05]", buf);
        }
        /* [999], repleace this const to current millisecond. */
        if (true) {
            snprintf(buf, sizeof(buf), "%03d", (int)(tv.tv_usec / 1000));
            path = srs_string_replace(path, "[999]", buf);
        }
        /* [timestamp],replace this const to current UNIX timestamp in ms. */
        if (true) {
            int64_t now_us = ((int64_t)tv.tv_sec) * 1000 * 1000 + (int64_t)tv.tv_usec;
            snprintf(buf, sizeof(buf), "%"PRId64, now_us / 1000);
            path = srs_string_replace(path, "[timestamp]", buf);
        }
        
        return path;
    }
    

    3.1.12 SrsTSMuxer::open

    /*
     * open the writer, donot write the PSI of ts.
     * @param p, a string indicates the path of ts file to mux to.
     */
    int SrsTSMuxer::open(string p)
    {
        int ret = ERROR_SUCCESS;
        
        path = p;
        
        close();
        
        /* reset the context for a new ts start. */
        context->reset();
        
        /* 调用子类 SrsHlsCacheWriter 实现的 open 函数 */
        if ((ret = writer->open(path)) != ERROR_SUCCESS) {
            return ret;
        }
        
        return ret;
    }
    

    3.1.13 SrsHlsCacheWriter::open

    int SrsHlsCacheWriter::open(string file)
    {
        if (!should_write_file) {
            return ERROR_SUCCESS;
        }
        
        return impl.open(file);
    }
    

    3.1.14 SrsFileWriter::open

    /*
     * open file writer, in truncate mode.
     * @param p, a string indicates the path of file to open.
     */
    int SrsFileWriter::open(string p)
    {
        int ret = ERROR_SUCCESS;
        
        if (fd > 0) {
            ret = ERROR_SYSTEM_FILE_ALREADY_OPENED;
            srs_error("file %s already opened. ret=%d", path.c_str(), ret);
            return ret;
        }
        
        int flags = O_CREAT|O_WRONLY|O_TRUNC;
        mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
    
        /* 打开 ./objs/nginx/html/live/livestream-0.ts.tmp 文件 */
        if ((fd = ::open(p.c_str(), flags, mode)) < 0) {
            ret = ERROR_SYSTEM_FILE_OPENE;
            srs_error("open file %s failed. ret=%d", p.c_str(), ret);
            return ret;
        }
        
        path = p;
        
        return ret;
    }
    

    上面分析完 acquire_publish 函数,下面接着分析 SrsPublishRecvThread 线程的构建。

    3.2 SrsPublishRecvThread 构造

    在该构造函数中,创建了一个可重复使用的 recv 线程,专门用于接收客户端推流的音视频数据。

    • the publish recv thread got message and callback the source method to process message.
    SrsPublishRecvThread::SrsPublishRecvThread(
        SrsRtmpServer* rtmp_sdk, 
        SrsRequest* _req, int mr_sock_fd, int timeout_ms, 
        SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
    ): trd(this, rtmp_sdk, timeout_ms)
    {
        rtmp = rtmp_sdk;
    
        _conn = conn;
        /* the params for conn callback. */
        _source = source;
        /* 由前知,为 true */
        _is_fmle = is_fmle;
        /* false */
        _is_edge = is_edge;
    
        recv_error_code = ERROR_SUCCESS;
        /* the msgs already got. */
        _nb_msgs = 0;
        /* The video frames we got. */
        video_frames = 0;
        /* the error timeout cond */
        error = st_cond_new();
        ncid = cid = 0;
        
        req = _req;
        mr_fd = mr_sock_fd;
    
        /* the mr settings, 
         * @see https://github.com/ossrs/srs/issues/241 */
        /* 没有配置 mr,默认返回 false */
        mr = _srs_config->get_mr_enabled(req->vhost);
        /* 默认返回 350 */
        mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
        
        /* 没有配置 min_latency,默认返回 false */
        realtime = _srs_config->get_realtime_enabled(req->vhost);
        
        _srs_config->subscribe(this);
    }
    

    调用该 SrsPublishRecvThread 的构造函数前,先调用 SrsRecvThread 类的构造函数。

    3.2.1 SrsRecvThread 构造

    • the recv thread, use message handler to handle each received message.
    SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, 
        SrsRtmpServer* rtmp_sdk, int timeout_ms)
    {
        timeout = timeout_ms;
        handler = msg_handler;
        rtmp = rtmp_sdk;
        trd = new SrsReusableThread2("recv", this);
    }
    

    3.2.2 SrsReusableThread2 构造

    SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, 
        int64_t interval_us)
    {
        handler = h;
        pthread = new internal::SrsThread(n, this, interval_us, true);
    }
    

    3.2.3 SrsThread 构造

    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, 
        int64_t interval_us, bool joinable)
    {
        _name = name;
        handler = thread_handler;
        cycle_interval_us = interval_us;
        
        tid = NULL;
        loop = false;
        really_terminated = true;
        _cid = -1;
        _joinable = joinable;
        disposed = false;
        
        // in start(), the thread cycle method maybe stop and remove the thread itself,
        // and the thread start() is waiting for the _cid, and segment fault then.
        // @see https://github.com/ossrs/srs/issues/110
        // thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
        can_run = false;
    }
    

    4. SrsRtmpConn::do_publishing

    int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
    {
        int ret = ERROR_SUCCESS;
        
        SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
        SrsAutoFree(SrsPithyPrint, pprint);
        
        /* start isolate recv thread. */
        if ((ret = trd->start()) != ERROR_SUCCESS) {
            srs_error("start isolate recv thread failed. ret=%d", ret);
            return ret;
        }
        
        /* change the isolate recv thread context id,
         * merge its log to current thread. */
        int receive_thread_cid = trd->get_cid();
        trd->set_cid(_srs_context->get_id());
        
        /* initialize the publish timeout. */
        publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
        publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
        
        /* set the sock options. */
        set_sock_options();
        
        if (true) {
            /* 没有启用 mr 功能,为 false */
            bool mr = _srs_config->get_mr_enabled(req->vhost);
            int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
        }
        
        int64_t nb_msgs = 0;
        uint64_t nb_frames = 0;
        while (!disposed) {
            pprint->elapse();
            
            /* when source is set to expired, disconnect it. */
            if (expired) {
                ret = ERROR_USER_DISCONNECT;
                srs_error("connection expired. ret=%d", ret);
                return ret;
            }
            
            /* conn wait for timeout */
            if (nb_msgs == 0) {
                /* when not got msgs, wait for a larger timeout. */
                trd->wait(publish_1stpkt_timeout);
            } else {
                trd->wait(publish_normal_timeout);
            }
            
            /* check the thread error code */
            if ((ret = trd->error_code()) != ERROR_SUCCESS) {
                if (!srs_is_system_control_error(ret) && 
                    !srs_is_client_gracefully_close(ret)) {
                    srs_error("recv thread failed. ret=%d", ret);
                }
                return ret;
            }
            
            /* when not any messages, timeout */
            if (trd->nb_msgs() <= nb_msgs) {
                ret = ERROR_SOCKET_TIMEOUT;
                srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d",
                    nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
                break;
            }
            nb_msgs = trd->nb_msgs();
            
            /* update the stat for video fps. */
            SrsStatistic* stat = SrsStatistic::instance();
            if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames))) 
                != ERROR_SUCCESS) {
                return ret;
            }
            nb_frames = trd->nb_video_frames();
    
            /* reportable */
            if (pprint->can_print()) {
                kbps->sample();
                bool mr = _srs_config->get_mr_enabled(req->vhost);
                int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
                );
            }
        }
        
        return ret;
    }
    

    该 while 循环主要是循环进行 error 条件变量上进入休眠状态,等待该条件变量成立(即若 recv 线程发生错误,则会通过 error 条件变量唤醒在该条件变量上等待的线程,以便该线程进行处理 recv error),或者等待的超时时间到了才唤醒。

    5. recv 线程

    该线程具体分析可见: SRS之接收推流线程:recv

  • 相关阅读:
    LeetCode 88. Merge Sorted Array
    LeetCode 75. Sort Colors
    LeetCode 581. Shortest Unsorted Continuous Subarray
    LeetCode 20. Valid Parentheses
    LeetCode 53. Maximum Subarray
    LeetCode 461. Hamming Distance
    LeetCode 448. Find All Numbers Disappeared in an Array
    LeetCode 976. Largest Perimeter Triangle
    LeetCode 1295. Find Numbers with Even Number of Digits
    如何自学并且系统学习计算机网络?(知乎问答)
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9130634.html
Copyright © 2011-2022 走看看