zoukankan      html  css  js  c++  java
  • SRS之HLS部署实例源码分析

    1. 综述

    SRS 关于 HLS 的具体配置可见: HLS部署实例

    SRS 关于 hls 的配置文件内容如下:

    listen              1935;
    max_connections     1000;
    daemon              off;
    srs_log_tank        console;
    vhost __defaultVhost__ {
        hls {
            enabled         on;
            hls_fragment    10;
            hls_window      60;
            hls_path        ./objs/nginx/html;
            hls_m3u8_file   [app]/[stream].m3u8;
            hls_ts_file     [app]/[stream]-[seq].ts;
        }
    }
    

    SRS 端具体流程分析如下:

    1. SRS 建立对 1935 端口的监听: SRS之监听端口的管理:RTMP
    2. 在建立对 1935 端口监听的过程中创建了针对该端口的 tcp 监听线程,在该线程中 accept 客户端的连接请求,接着针对该客户端创建一个 conn 线程,以便为该客户端提供服务: SRS之RTMP连接处理线程conn:接收客户端推流
    3. 在 conn 线程的循环开始时,首先服务器会与客户端进行 handshake 过程: SRS之RTMP handshake
    4. handshake 成功后,会接收客户端 handshake 后的第一个命令,一般为 connect('xxx'): SRS之SrsRtmpServer::connect_app详解
    5. 接着进入 SrsRtmpConn::service_cycle 函数: SRS之SrsRtmpConn::service_cycle详解
    6. 在 SrsRtmpConn::service_cycle 函数开始向客户端发送 应答窗口大小(5)、设置流带宽(6)、设置块大小(1)、response connect 的响应等消息后,开始进入循环,此时,调用 stream_service_cycle 函数

    对于 hls 的分析,下面开始从 stream_service_cycle 开始分析。

    2. SrsRtmpConn::stream_service_cycle

    int SrsRtmpConn::stream_service_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        SrsRtmpConnType type;
        /* 接收一些消息,然后根据该消息的内容识别该客户端接下来是 publish 还是 play */
        if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) 
            != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("identify client failed. ret=%d", ret);
            }
            return ret;
        }
        req->strip();
        
        /* security check: 若 vhost 中没有配置 security,则默认返回 false  */ 
        if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
            srs_error("security check failed. ret=%d", ret);
            return ret;
        }
        
        /* SRS 不支持流名称为空的请求,因为对于 HLS 需要通过流名称写入到文件中 */
        if (req->stream.empty()) {
            ret = ERROR_RTMP_STREAM_NAME_EMPTY;
            srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
            return ret;
        }
        
        /* client is identified, set the timeout to service timeout. */
        rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
        rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
        
        /* find a source to serve. */
        SrsSource* source = NULL;
        if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) 
        {
            return ret;
        }
        srs_assert(source != NULL);
        
        /* update the statistic when source disconveried. */
        SrsStatistic* stat = SrsStatistic::instance();
        if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) 
        {
            srs_error("stat client failed. ret=%d", ret);
            return ret;
        }
        
        /* 若 vhost 没有配置 mode,则默认返回 false */
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        /* 默认开启 gop_cache */
        bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
        source->set_cache(enabled_cache);
        
        client_type = type;
        switch (type) {
            ...
            
            case SrsRtmpConnFMLEPublish: {
                srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
                
                if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                    srs_error("start to publish stream failed. ret=%d", ret);
                    return ret;
                }
                
                return publishing(source);
            }
            
            ...
            
            default: {
                ret = ERROR_SYSTEM_CLIENT_INVALID;
                srs_info("invalid client type=%d. ret=%d", type, ret);
                return ret;
            }
        }
        
        return ret;
    }
    

    2.1 SrsSource::fetch_or_create

    /* 该全局变量 pool 用于保存创建的 SrsSource 类对象,
     * key: vhost/app/stream 
     * value: SrsSource* */
    std::map<std::string, SrsSource*> SrsSource::pool;
    
    /*
     * create source when fetch from cache failed.
     * @param r, the client request.
     * @param h, the event handlr for source.
     * @param pps, the matched source, if success never be NULL.
     */
    int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
    {
        int ret = ERROR_SUCCESS;
        
        SrsSource* source = NULL;
        /* 检测 pool 中是否有 key(如vhost/live/livestream)对应的 SrsSource*,
         * 若有则返回该 SrsSource* */
        if ((source = fetch(r)) != NULL) {
            *pps = source;
            return ret;
        }
        
        string stream_url = r->get_stream_url();
        string vhost = r->vhost;
        
        /* should always not exists for create a source. */
        srs_assert (pool.find(stream_url) == pool.end());
        
        /* 获取或构建一个新的 SrsSource */
        source = new SrsSource();
        if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
            srs_freep(source);
            return ret;
        }
        
        /* 将创建成功并初始化后 source 放入到 pool 中 */
        pool[stream_url] = source;
        
        *pps = source;
        
        return ret;
    }
    

    下面主要分析 SrsSource 的构造。

    2.1.1 SrsSource 构造

    /* live streaming source. */
    SrsSource::SrsSource()
    {
        /* deep copy of client request. */
        _req = NULL;
        /* the time jitter algorithm for vhost. 这里初始化关闭 */
        jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
        
        mix_correct = false;
        mix_queue = new SrsMixQueue();
        
    #ifdef SRS_AUTO_HLS
        /* 构造 SrsHls 类 */
        hls = new SrsHls();
    #endif
    ...
        
        cache_metadata = cache_sh_video = cache_sh_audio = NULL;
        
        /* can publish, true when is not streaming */
        _can_publish = true;
        _pre_source_id = _source_id = -1;
        /* last die time, when all consumers quit and no publisher,
         * we will remove the source when source die. */
        die_at = -1;
        
        /* edge control service */
        play_edge = new SrsPlayEdge();
        publish_edge = new SrsPublishEdge();
        /* gop cache for client fast startup. */
        gop_cache = new SrsGopCache();
        /* for aggregate message */
        aggregate_stream = new SrsStream();
        
        is_monotonically_increase = false;
        last_packet_time = 0;
        
        _srs_config->subscribe(this);
        atc = false;
    }
    

    2.1.2 SrsHls 构造

    /*
     * delivery RTMP stream to HLS(m3u8 and ts), 
     * SrsHls provides interface with SrsSource. 
     */
    SrsHls::SrsHls()
    {
        _req = NULL;
        source = NULL;
        
        /* hls 使能标志 */
        hls_enabled = false;
        /* hls 清理的过期时间,系统重启或者超过这个时间时,清理 hls 所有文件,
         * 包括 m3u8 文件和 ts。默认为 0,不清理 */
        hls_can_dispose = false;
        last_update_time = 0;
    
        /* the h264/avc and aac codec, for media stream. */
        codec = new SrsAvcAacCodec();
        /* the samples in the flv audio/video packet. */
        sample = new SrsCodecSample();
        /* time jitter detect and correct, to ensure the rtmp stream is monotonically. */
        jitter = new SrsRtmpJitter();
        
        /* muxer the HLS stream(m3u8 and ts files). */
        muxer = new SrsHlsMuxer();
        /* hls stream cache, use to cache hls stream and flush to hls muxer. */
        hls_cache = new SrsHlsCache();
    
        pprint = SrsPithyPrint::create_hls();
        /*
         * we store the stream dts,
         * for when we notice the hls cache to publish,
         * it need to know the segment start dts.
         *
         * for example. when republish, the stream dts will
         * monotonically increase, and the ts dts should start 
         * from current dts.
         *
         * or, simply because the HlsCache never free when unpublish,
         * so when publish or republish it must start at stream dts,
         * not zero dts.
         */
        stream_dts = 0;
    }
    

    2.1.3 SrsAvcAacCodec 构造

    /**
    * the h264/avc and aac codec, for media stream.
    *
    * to demux the FLV/RTMP video/audio packet to sample,
    * add each NALUs of h.264 as a sample unit to sample,
    * while the entire aac raw data as a sample unit.
    *
    * for sequence header,
    * demux it and save it in the avc_extra_data and aac_extra_data,
    * 
    * for the codec info, such as audio sample rate,
    * decode from FLV/RTMP header, then use codec info in sequence 
    * header to override it.
    */
    SrsAvcAacCodec::SrsAvcAacCodec()
    {
        /* for sequence header, whether parse the h.264 sps. */
        avc_parse_sps               = true;
        
        width                       = 0;
        height                      = 0;
        duration                    = 0;
        NAL_unit_length             = 0;
        frame_rate                  = 0;
    
        video_data_rate             = 0;
        video_codec_id              = 0;
    
        audio_data_rate             = 0;
        audio_codec_id              = 0;
    
        avc_profile                 = SrsAvcProfileReserved;
        avc_level                   = SrsAvcLevelReserved;
        aac_object                  = SrsAacObjectTypeReserved;
        aac_sample_rate             = SRS_AAC_SAMPLE_RATE_UNSET; // sample rate ignored
        aac_channels                = 0;
        avc_extra_size              = 0;
        avc_extra_data              = NULL;
        aac_extra_size              = 0;
        aac_extra_data              = NULL;
    
        sequenceParameterSetLength  = 0;
        sequenceParameterSetNALUnit = NULL;
        pictureParameterSetLength   = 0;
        pictureParameterSetNALUnit  = NULL;
    
        /* the avc payload format. */
        payload_format = SrsAvcPayloadFormatGuess;
        stream = new SrsStream();
    }
    

    2.1.4 SrsCodecSample 构造

    /**
    * the samples in the flv audio/video packet.
    * the sample used to analysis a video/audio packet,
    * split the h.264 NALUs to buffers, or aac raw data to a buffer,
    * and decode the video/audio specified infos.
    * 
    * the sample unit:
    *       a video packet codec in h.264 contains many NALUs, each is a sample unit.
    *       a audio packet codec in aac is a sample unit.
    * @remark, the video/audio sequence header is not sample unit,
    *       all sequence header stores as extra data, 
    *       @see SrsAvcAacCodec.avc_extra_data and SrsAvcAacCodec.aac_extra_data
    * @remark, user must clear all samples before decode a new video/audio packet.
    */
    SrsCodecSample::SrsCodecSample()
    {
        clear();
    }
    
    /**
     * clear all samples.
     * the sample units never copy the bytes, it directly use the ptr,
     * so when video/audio packet is destroyed, the sample must be clear.
     * in a word, user must clear sample before demux it.
     * @remark demux sample use SrsAvcAacCodec.audio_aac_demux or video_avc_demux.
     */
    void SrsCodecSample::clear()
    {
        /* whether the sample is video sample which demux from video packet. */
        is_video = false;
        /*
         * each audio/video raw data packet will dumps to one or multiple buffers,
         * the buffers will write to hls and clear to reset.
         * generally, aac audio packet corresponding to one buffer,
         * where avc/h264 video packet may contains multiple buffer.
         */
        nb_sample_units = 0;
    
        /* 
         * CompositionTime, video_file_format_spec_v10_1.pdf, page 78.
         * cts = pts - dts, where dts = flvheader->timestamp.
         */
        cts = 0;
        frame_type = SrsCodecVideoAVCFrameReserved;
        avc_packet_type = SrsCodecVideoAVCTypeReserved;
        has_sps_pps = has_aud = has_idr = false;
        first_nalu_type = SrsAvcNaluTypeReserved;
        
        acodec = SrsCodecAudioReserved1;
        sound_rate = SrsCodecAudioSampleRateReserved;
        sound_size = SrsCodecAudioSampleSizeReserved;
        sound_type = SrsCodecAudioSoundTypeReserved;
        aac_packet_type = SrsCodecAudioTypeReserved;
    }
    

    2.1.5 SrsHlsMuxer 构造

    /**
    * muxer the HLS stream(m3u8 and ts files).
    * generally, the m3u8 muxer only provides methods to open/close segments,
    * to flush video/audio, without any mechenisms.
    * 
    * that is, user must use HlsCache, which will control the methods of muxer,
    * and provides HLS mechenisms.
    */
    SrsHlsMuxer::SrsHlsMuxer()
    {
        req = NULL;
        hls_fragment = hls_window = 0;
        hls_aof_ratio = 1.0;
        /*
         * the deviation in piece to adjust the fragment to be more
         * bigger or smaller.
         */
        deviation_ts = 0;
        /* 是否删除过期的 ts 切片,不在 hls_window 中就是过期。
         * 可以关闭清除 ts 切片,实现时移和存储,使用自己的切片管理系统。
         * 这里默认为开启 */
        hls_cleanup = true;
        /* 是否按 gop 切片,即等待到关键帧后开始切片。这里默认开启 */
        hls_wait_keyframe = true;
        previous_floor_ts = 0;
        /*
         * the previous reap floor timestamp,
         * used to detect the dup or jmp or ts.
         */
        accept_floor_ts = 0;
        /* whether use floor algorithm for timestamp. */
        hls_ts_floor = false;
        /* 切片时长的最大值 */
        max_td = 0;
        /* ts 的序列号,每开始写一个新的 ts,该值加 1 */
        _sequence_no = 0;
        /* current writing segment. */
        current = NULL;
        /*
         * the current audio codec, when open new muxer, 
         * set the muxer audio codec.
         * @see https://github.com/ossrs/srs/issues/301
         */
        acodec = SrsCodecAudioReserved1;
        /* 默认不支持写入到 cache 中 */
        should_write_cache = false;
        /* 默认开启将 video/audio 写入到文件中,即 ts 文件 */
        should_write_file = true;
        /* 构建了一个可重复使用(SrsReusableThread)的线程:async */
        async = new SrsAsyncCallWorker();
        /* the ts context, to keep cc continous between ts. */
        context = new SrsTsContext();
    }
    

    2.1.6 SrsAsyncCallWorker 构造

    /**
     * the async callback for dvr.
     * when worker call with the task, the worker will do it in isolate thread.
     * that is, the task is execute/call in async mode.
     */
    SrsAsyncCallWorker::SrsAsyncCallWorker()
    {
        /* 创建一个可重复使用的线程:async */
        pthread = new SrsReusableThread("async", this, SRS_AUTO_ASYNC_CALLBACL_SLEEP_US);
    }
    

    2.1.6.1 SrsReusableThread 构造

    SrsReusableThread::SrsReusableThread(const char* n, ISrsReusableThreadHandler* h, 
        int64_t interval_us)
    {
        handler = h;
        pthread = new internal::SrsThread(n, this, interval_us, true);
    }
    

    2.1.6.2 SrsThread 构造

    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, int64_t interval_us, bool joinable)
    {
        _name = name;
        handler = thread_handler;
        cycle_interval_us = interval_us;
        
        tid = NULL;
        loop = false;
        really_terminated = true;
        _cid = -1;
        _joinable = joinable;
        disposed = false;
        
        // in start(), the thread cycle method maybe stop and remove the thread itself,
        // and the thread start() is waiting for the _cid, and segment fault then.
        // @see https://github.com/ossrs/srs/issues/110
        // thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
        can_run = false;
    }
    

    2.1.7 SrsHlsCache 构造

    /**
    * hls stream cache, 
    * use to cache hls stream and flush to hls muxer.
    * 
    * when write stream to ts file:
    * video frame will directly flush to M3u8Muxer,
    * audio frame need to cache, because it's small and flv tbn problem.
    * 
    * whatever, the Hls cache used to cache video/audio,
    * and flush video/audio to m3u8 muxer if needed.
    * 
    * about the flv tbn problem:
    *   flv tbn is 1/1000, ts tbn is 1/90000,
    *   when timestamp convert to flv tbn, it will loose precise,
    *   so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter,
    *   we use a aac jitter to correct the audio pts.
    */
    SrsHlsCache::SrsHlsCache()
    {
        cache = new SrsTsCache();
    }
    

    2.1.8 SrsTsCache 构造

    /**
    * ts stream cache, 
    * use to cache ts stream.
    * 
    * about the flv tbn problem:
    *   flv tbn is 1/1000, ts tbn is 1/90000,
    *   when timestamp convert to flv tbn, it will loose precise,
    *   so we must gather audio frame together, and recalc the timestamp @see SrsTsAacJitter,
    *   we use a aac jitter to correct the audio pts.
    */
    SrsTsCache::SrsTsCache()
    {
        /* current ts message. */
        audio = NULL;
        video = NULL;
    }
    

    上面构造好 SrsSource 后,接着调用 SrsSource 的 initialize 函数进行初始化。

    2.1.9 SrsSource::initialize

    int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
    {
        int ret = ERROR_SUCCESS;
        
        srs_assert(h);
        srs_assert(!_req);
    
        /* the event handler. */
        handler = h;
        /* deep copy of client request. */
        _req = r->copy();
        /* 若 vhost 中没有配置 atc,默认返回 false */
        atc = _srs_config->get_atc(_req->vhost);
    
    #ifdef SRS_AUTO_HLS
        if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
            return ret;
        }
    #endif
        
    ..
    
        if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
        if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
        
        /* 若 vhost 中没有配置 queue_length,则默认播放队列的大小为 30s
         *
         * in seconds, the live queue length
         * #define SRS_PERF_PLAY_QUEUE 30
         */
        double queue_size = _srs_config->get_queue_length(_req->vhost);
        publish_edge->set_queue_size(queue_size);
        
        /* the time jitter algorithm for vhost.
         * 若 vhost 中没有配置 time_jitter,则默认返回值为 SrsRtmpJitterAlgorithmFULL */
        jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
        /* 若 vhost 中没有配置 mix_correct,则默认返回 false,
         * 即禁止使用 interlaved/mixed algorithm 去校正 timstamp */
        mix_correct = _srs_config->get_mix_correct(_req->vhost);
        
        return ret;
    }
    

    2.1.10 SrsHls::initialize

    /*
     * initialize the hls by handler and source.  
     */
    int SrsHls::initialize(SrsSource* s)
    {
        int ret = ERROR_SUCCESS;
    
        source = s;
    
        /* 初始化 hls muxer */
        if ((ret = muxer->initialize()) != ERROR_SUCCESS) {
            return ret;
        }
    
        return ret;
    }
    

    2.1.11 SrsHlsMuxer::initialize

    /* initialize the hls muxer. */
    int SrsHlsMuxer::initialize()
    {
        int ret = ERROR_SUCCESS;
        
        /* 启动 async 线程 */
        if ((ret = async->start()) != ERROR_SUCCESS) {
            return ret;
        }
        
        return ret;
    }
    

    2.1.12 SrsAsyncCallWorker::start

    int SrsAsyncCallWorker::start()
    {
        return pthread->start();
    }
    

    2.1.13 SrsReusableThread::start

    int SrsReusableThread::start()
    {
        return pthread->start();
    }
    

    2.1.14 SrsThread::start

    int SrsThread::start()
    {
        int ret = ERROR_SUCCESS;
        
        if(tid) {
            srs_info("thread %s already running.", _name);
            return ret;
        }
        
        /* 创建线程 */
        if((tid = st_thread_create(thread_fun, this, (_joinable? 1:0), 0)) == NULL){
            ret = ERROR_ST_CREATE_CYCLE_THREAD;
            srs_error("st_thread_create failed. ret=%d", ret);
            return ret;
        }
        
        disposed = false;
        // we set to loop to true for thread to run.
        loop = true;
        
        // wait for cid to ready, for parent thread to get the cid.
        while (_cid < 0) {
            st_usleep(10 * 1000);
        }
        
        // now, cycle thread can run.
        can_run = true;
        
        return ret;
    }
    

    3. SrsRtmpConn::publishing

    int SrsRtmpConn::publishing(SrsSource* source)
    {
        int ret = ERROR_SUCCESS;
        
        /* vhost 中没有配置 refer_publish,忽略 */
        if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) 
            != ERROR_SUCCESS) {
            srs_error("check publish_refer failed. ret=%d", ret);
            return ret;
        }
        
        /* vhost 中没有配置 http_hooks,ignore */
        if ((ret = http_hooks_on_publish()) != ERROR_SUCCESS) {
            srs_error("http hook on_publish failed. ret=%d", ret);
            return ret;
        }
        
        /* 这里返回 false */
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        if ((ret = acquire_publish(source, vhost_is_edge)) == ERROR_SUCCESS) {
            SrsPublishRecvThread tcp(rtmp, req, 
                st_netfd_fileno(stfd), 0, this, source,
                client_type != SrsRtmpConnFlashPublish, 
                vhost_is_edge);
            
            ret = do_publishing(source, &trd);
            
            /* stop isolate recv thread */
            trd.stop();
        }
        
        /*
         * whatever the acquire publish, always release publish.
         * when the acquire error in the middle-way, the publish state changed,
         * but failed, so we must cleanup it.
         * @remark, when stream is busy, should never release it.
         */
        if (ret != ERROR_SYSTEM_STREAM_BUSY) {
            release_publish(source, vhost_is_edge);
        }
    
        http_hooks_on_unpublish();
    
        return ret;
    }
    

    3.1 SrsRtmpConn::acquire_publish

    int SrsRtmpConn::acquire_publish(SrsSource* source, bool is_edge)
    {
        int ret = ERROR_SUCCESS;
    
        /* 这里直接返回 _can_publish 的值,source  构建时初始化为 true */
        if (!source->can_publish(is_edge)) {
            ret = ERROR_SYSTEM_STREAM_BUSY;
            srs_warn("stream %s is already publishing. ret=%d", 
                req->get_stream_url().c_str(), ret);
            return ret;
        }
        
        /* vhost 中没有配置 mode,因此 is_edge 为 false */
        /* when edge, ignore the publish event, directly proxy it. */
        if (is_edge) {
            if ((ret = source->on_edge_start_publish()) != ERROR_SUCCESS) {
                srs_error("notice edge start publish stream failed. ret=%d", ret);
                return ret;
            }        
        } else {
            /* publish stream event notify. */
            if ((ret = source->on_publish()) != ERROR_SUCCESS) {
                srs_error("notify publish failed. ret=%d", ret);
                return ret;
            }
        }
    
        return ret;
    }
    

    3.1.1 SrsSource::on_publish

    /*
     * publish stream event notify.
     * @param _req, the request from client, the source will deep copy it,
     *     for when reload the request of client maybe invalid.
     */
    int SrsSource::on_publish()
    {
        int ret = ERROR_SUCCESS;
        
        /* update the request object. */
        srs_assert(_req);
        
        _can_publish = false;
        
        /* whatever, the publish thread is the source or edge source,
         * save its id to srouce id. */
        on_source_id_changed(_srs_context->get_id());
        
        /* reset the mix queue. */
        mix_queue->clear();
        
        /* detect the monotonically again. */
        is_monotonically_increase = true;
        last_packet_time = 0;
        
        /* create forwarders */
        /* vhost 没有配置 forward,ignore */
        if ((ret = create_forwarders()) != ERROR_SUCCESS) {
            srs_error("create forwarders failed. ret=%d", ret);
            return ret;
        }
        
        /* TODO: FIXME: use initialize to set req. */
    #ifdef SRS_AUTO_TRANSCODE
        ...
    #endif
        
        /* TODO: FIXME: use initialize to set req. */
    #ifdef SRS_AUTO_HLS
        if ((ret = hls->on_publish(_req, false)) != ERROR_SUCCESS) {
            srs_error("start hls failed. ret=%d", ret);
            return ret;
        }
    #endif
        
        /* TODO: FIXME: use initialize to set req. */
    #ifdef SRS_AUTO_DVR
        ...
    #endif
    
    #ifdef SRS_AUTO_HDS
        ...
    #endif
    
        /* notify the handler. */
        srs_assert(handler);
        /* 调用子类 SrsServer 实现的 on_publish 函数,没有开启相关功能,ignore */
        if ((ret = handler->on_publish(this, _req)) != ERROR_SUCCESS) {
            srs_error("handle on publish failed. ret=%d", ret);
            return ret;
        }
        SrsStatistic* stat = SrsStatistic::instance();
        stat->on_stream_publish(_req, _source_id);
        
        return ret;
    }
    

    3.1.2 SrsSource::on_source_id_changed

    int SrsSource::on_source_id_changed(int id)
    {
        int ret = ERROR_SUCCESS;
        
        if (_source_id == id) {
            return ret;
        }
        
        if (_pre_source_id == -1) {
            _pre_source_id = id;
        } else if (_pre_source_id != _source_id) {
            _pre_source_id = _source_id;
        }
        
        _source_id = id;
        
        /* notice all consumer */
        std::vector<SrsConsumer*>::iterator it;
        for (it = consumers.begin(); it != consumers.end(); ++it) {
            SrsConsumer* consumer = *it;
            /* 该函数将 should_update_source_id 置为 true */
            consumer->update_source_id();
        }
        
        return ret;
    }
    

    3.1.3 SrsHls::on_publish

    /*
     * publish stream event, continue to write the m3u8,
     * for the muxer object not destroyed.
     * @param fetch_sequence_header, whether fetch sequence from source.
     */
    int SrsHls::on_publish(SrsRequest* req, bool fetch_sequence_header)
    {
        int ret = ERROR_SUCCESS;
        
        srs_freep(_req);
        _req = req->copy();
        
        /* update the hls time, for hls_dispose. */
        last_update_time = srs_get_system_time_ms();
        
        /* SrsHls 构建时初始化为 false,因此不支持多次 publish */
        /* support multiple publish. */
        if (hls_enabled) {
            return ret;
        }
        
        /* vhost 中使能了 hls */
        std::string vhost = req->vhost;
        if (!_srs_config->get_hls_enabled(vhost)) {
            return ret;
        }
        
        /* 初始化 muxer,创建 m3u8 目录和文件等等 */
        if ((ret = hls_cache->on_publish(muxer, req, stream_dts)) != ERROR_SUCCESS) {
            return ret;
        }
        
        /* if enabled, open the muxer. */
        hls_enabled = true;
        
        /* ok, the hls can be dispose, or need to be dispose. */
        hls_can_dispose = true;
        
        /* 由上面调用知,传入的值为 false */
        /* when publish, don't need to fetch sequence header, which is old and maybe corrupt.
         * when reload, we must fetch the sequence header from source cache. */
        if (fetch_sequence_header) {
            /* notice the source to get the cached sequence header.
             * when reload to start hls, hls will never get the sequence header in stream,
             * use the SrsSource.on_hls_start to push the sequence header to HLS. */
            if ((ret = source->on_hls_start()) != ERROR_SUCCESS) {
                srs_error("callback source hls start failed. ret=%d", ret);
                return ret;
            }
        }
        
        return ret;
    }
    

    3.1.4 SrsHlsCache::on_publish

    int SrsHlsCache::on_publish(SrsHlsMuxer* muxer, SrsRequest* req, 
        int64_t segment_start_dts)
    {
        int ret = ERROR_SUCCESS;
    
        std::string vhost = req->vhost;
        std::string stream = req->stream;
        std::string app = req->app;
        
        /* get the hls fragment time, in seconds.
         * 配置文件中设置 hls_fragment 为 10,即切片时长的最小值为 10s */
        double hls_fragment = _srs_config->get_hls_fragment(vhost);
       
        /* get the hls window time, in seconds.
         * a window is a set of ts, the ts collection in m3u8.
         * @remark SRS will delete the ts exceed the window.
         * 配置文件中设置 hls_window 为 60,即所有切片加起来的最大时长为 60s */
        double hls_window = _srs_config->get_hls_window(vhost);
        
        /* get the hls m3u8 ts list entry prefix config */
        /* get the HLS m3u8 list ts segment entry prefix info.
         * vhost 中没有配置 hls_entry_prefix,因此返回 "" */
        std::string entry_prefix = _srs_config->get_hls_entry_prefix(vhost);
        
        /* get the HLS ts/m3u8 file store path.
         * 配置文件中 hls_path 为 ./objs/nginx/html  */
        std::string path = _srs_config->get_hls_path(vhost);
        
        /* get the HLS m3u8 file path template.
         * 配置文件中 hls_m3u8_file 为 [app]/[stream].m3u8 */
        std::string m3u8_file = _srs_config->get_hls_m3u8_file(vhost);
       
        /* get the HLS ts file path template.
         * 配置文件中 hls_ts_file 为 [app]/[stream]-[seq].ts */
        std::string ts_file = _srs_config->get_hls_ts_file(vhost);
        
        /* whether cleanup the old ts files.
         * hls 中没有配置 hls_cleanup,因此默认返回 true
         * 即开启删除过期的 ts 切片功能,不在 hls_window 中就是过期 */
        bool cleanup = _srs_config->get_hls_cleanup(vhost);
        
        /* whether reap the ts when got keyframe.
         * hls 中没有配置 hls_wait_keyframe,默认返回 true,即表示等待 I帧
         * 因此需要该切片时长超过 10s,且获取到 I帧的情况下才开始切片 */
        bool wait_keyframe = _srs_config->get_hls_wait_keyframe(vhost);
        
        /* get the hls aof(audio overflow) ratio.
         * hls 没有配置 hls_aof_ratio,默认返回 2.0,
         * 纯音频下音频时长的倍数 */
        double hls_aof_ratio = _srs_config->get_hls_aof_ratio(vhost);
        
        /* whether use floor(timestamp/hls_fragment) for variable timestamp
         * hls 中没有配置 hls_ts_floor,默认返回 false*/
        bool ts_floor = _srs_config->get_hls_ts_floor(vhost);
        
        /* the seconds to dispose the hls.
         * hls 中没有配置 hls_dispose,默认返回 0 */
        int hls_dispose = _srs_config->get_hls_dispose(vhost);
        
        /* TODO: FIXME: support load exists m3u8, to continue publish stream.
         * for the HLS donot requires the EXT-X-MEDIA-SEQUENCE be monotonically increase */
        
        /* open muxer, when publish, update the config for muxer */
        if ((ret = muxer->update_config(req, entry_prefix,
            path, m3u8_file, ts_file, hls_fragment, hls_window, ts_floor, hls_aof_ratio,
            cleanup, wait_keyframe)) != ERROR_SUCCESS
        ) {
            srs_error("m3u8 muxer update config failed. ret=%d", ret);
            return ret;
        }
        
        /* open a new segment(a new ts file) */
        if ((ret = muxer->segment_open(segment_start_dts)) != ERROR_SUCCESS) {
            srs_error("m3u8 muxer open segment failed. ret=%d", ret);
            return ret;
        }
        
        return ret;
    }
    

    3.1.5 SrsHlsMuxer::update_config

    /* when publish, update the config for muxer. */
    int SrsHlsMuxer::update_config(SrsRequest* r, string entry_prefix,
        string path, string m3u8_file, string ts_file, double fragment, double window,
        bool ts_floor, double aof_ratio, bool cleanup, bool wait_keyframe
    ) {
        int ret = ERROR_SUCCESS;
        
        srs_freep(req);
        /* 深度拷贝一个副本给 req */
        req = r->copy();
    
        /* "" */
        hls_entry_prefix = entry_prefix;
        /* hls_path: ./objs/nginx/html */
        hls_path = path;
        /* hls_ts_file: "[app]/[stream]-[seq].ts" */
        hls_ts_file = ts_file;
        /* hls_fragment: 10s */
        hls_fragment = fragment;
        /* hls_aof_ratio: 2.0 */
        hls_aof_ratio = aof_ratio;
        /* hls_ts_floor: false */
        hls_ts_floor = ts_floor;
        /* hls_cleanup: true */
        hls_cleanup = cleanup;
        /* hls_wait_keyframe: true */
        hls_wait_keyframe = wait_keyframe;
        previous_floor_ts = 0;
        accept_floor_ts = 0;
        /* hls_window: 60s */
        hls_window = window;
        deviation_ts = 0;
        
        /* generate the m3u8 dir and path. *
        /* m3u8_url: "live/livestream.m3u8" */
        m3u8_url = srs_path_build_stream(m3u8_file, req->vhost, req->app, req->stream);
        /* m3u8: ./objs/nginx/html/live/livestream.m3u8 */
        m3u8 = path + "/" + m3u8_url;
    
        /* when update config, reset the history target duration. */
        /* get_hls_td_ratio: get the hls td(target duration) ratio.
         * hls 没有配置 hls_td_ratio,默认返回 1.5,而 fragment 配置为 10,
         * 因此 max_td 为 15 */
        max_td = (int)(fragment * _srs_config->get_hls_td_ratio(r->vhost));
        
        /* TODO: FIXME: refine better for SRS2 only support disk. */
        should_write_cache = false;
        should_write_file = true;
        
        /* create m3u8 dir once. */
        /* 返回 m3u8 的目录路径,即 ./objs/nginx/html/live */
        m3u8_dir = srs_path_dirname(m3u8);
        if (should_write_file && (ret = srs_create_dir_recursively(m3u8_dir)) != ERROR_SUCCESS) 
        {
            srs_error("create app dir %s failed. ret=%d", m3u8_dir.c_str(), ret);
            return ret;
        }
        srs_info("create m3u8 dir %s ok", m3u8_dir.c_str());
        
        return ret;
    }
    

    3.1.6 srs_path_build_stream

    /*
     * build the path according to vhost/app/stream, where replace variables:
     *     [vhost], the vhost of stream.
     *     [app], the app of stream.
     *     [stream], the stream name of stream.
     * @return the replaced path.
     */
    string srs_path_build_stream(string template_path, string vhost, string app, string stream)
    {
        std::string path = template_path;
        
        /* variable [vhost] */
        path = srs_string_replace(path, "[vhost]", vhost);
        /* variable [app] */
        path = srs_string_replace(path, "[app]", app);
        /* variable [stream] */
        path = srs_string_replace(path, "[stream]", stream);
        
        return path;
    }
    

    3.1.7 SrsHlsMuxer::segment_open

    /*
     * open a new segment(a new ts file),
     * @param segment_start_dts, use to calc the segment duration,
     *    use 0 for the first segment of HLS.
     */
    int SrsHlsMuxer::segment_open(int64_t segment_start_dts)
    {
        int ret = ERROR_SUCCESS;
        
        /* current: current writing segment. 初始化时为 NULL */
        if (current) {
            srs_warn("ignore the segment open, for segment is already open.");
            return ret;
        }
        
        /* when segment open, the current segment must be NULL. */
        srs_assert(!current);
        
        /* load the default acodec from config. */
        SrsCodecAudio default_acodec = SrsCodecAudioAAC;
        if (true) {
            /* hls 没有配置 hls_acodec,默认返回 "aac" */
            std::string default_acodec_str = _srs_config->get_hls_acodec(req->vhost);
            if (default_acodec_str == "mp3") {
                default_acodec = SrsCodecAudioMP3;
                srs_info("hls: use default mp3 acodec");
            } else if (default_acodec_str == "aac") {
                default_acodec = SrsCodecAudioAAC;
                srs_info("hls: use default aac acodec");
            } else if (default_acodec_str == "an") {
                default_acodec = SrsCodecAudioDisabled;
                srs_info("hls: use default an acodec for pure video");
            } else {
                srs_warn("hls: use aac for other codec=%s", default_acodec_str.c_str());
            }
        }
        
        /* load the default vcodec from config. */
        SrsCodecVideo default_vcodec = SrsCodecVideoAVC;
        if (true) {
            /* hls 中没有配置 hls_vcodec,默认返回 "h264" */
            std::string default_vcodec_str = _srs_config->get_hls_vcodec(req->vhost);
            if (default_vcodec_str == "h264") {
                default_vcodec = SrsCodecVideoAVC;
                srs_info("hls: use default h264 vcodec");
            } else if (default_vcodec_str == "vn") {
                default_vcodec = SrsCodecVideoDisabled;
                srs_info("hls: use default vn vcodec for pure audio");
            } else {
                srs_warn("hls: use h264 for other codec=%s", default_vcodec_str.c_str());
            }
        }
        
        /* new segment. */
        current = new SrsHlsSegment(context, should_write_cache, should_write_file, 
                                    default_acodec, default_vcodec);
        /* _sequence_no: sequence number in m3u8. */
        current->sequence_no = _sequence_no++;
        /* 若为 HLS 的 first segment,则 segment_start_dts 为 0 */
        current->segment_start_dts = segment_start_dts;
        
        /* generate filename. */
        /* ts_file: "[app]/[stream]-[seq].ts" */
        std::string ts_file = hls_ts_file;
        /* ts_file: "live/livestream-[seq].ts" */
        ts_file = srs_path_build_stream(ts_file, req->vhost, req->app, req->stream);
        /* SrsHlsMuxer 构造时,初始化该值为 false */
        if (hls_ts_floor) {
            ...
            
        } else {
            ts_file = srs_path_build_timestamp(ts_file);
        }
        if (true) {
            std::stringstream ss;
            ss << current->sequence_no;
            /* ts_file: live/livestream-0.ts */
            ts_file = srs_string_replace(ts_file, "[seq]", ss.str());
        }
        /* full_path: ./objs/nginx/html/live/livestream-0.ts */
        current->full_path = hls_path + "/" + ts_file;
        
        /* the ts url, relative or absolute url. */
        std::string ts_url = current->full_path;
        /* ts_url: ./objs/nginx/html/live/livestream-0.ts,
         * m3u8_dir: ./objs/nginx/html/live */
        if (srs_string_starts_with(ts_url, m3u8_dir)) {
            /* ts_url: /livestream-0.ts */
            ts_url = ts_url.substr(m3u8_dir.length());
        }
        while (srs_string_starts_with(ts_url, "/")) {
            /* ts_url: livestream-0.ts */
            ts_url = ts_url.substr(1);
        }
        /* current->uri: "" */
        current->uri += hls_entry_prefix;
        if (!hls_entry_prefix.empty() && !srs_string_ends_with(hls_entry_prefix, "/")) 
        {
            current->uri += "/";
            
            /* add the http dir to uri. */
            string http_dir = srs_path_dirname(m3u8_url);
            if (!http_dir.empty()) {
                current->uri += http_dir + "/";
            }
        }
        /* current->uri: livestream-0.ts */
        current->uri += ts_url;
        
        /* create dir recursively for hls. */
        /* ts_dir: ./objs/nginx/html/live */
        std::string ts_dir = srs_path_dirname(current->full_path);
        if (should_write_file && (ret = srs_create_dir_recursively(ts_dir)) 
            != ERROR_SUCCESS) {
            srs_error("create app dir %s failed. ret=%d", ts_dir.c_str(), ret);
            return ret;
        }
        
        /* open temp ts file. */
        /* tmp_file: ./objs/nginx/html/live/livestream-0.ts.tmp */
        std::string tmp_file = current->full_path + ".tmp";
        if ((ret = current->muxer->open(tmp_file.c_str())) != ERROR_SUCCESS) {
            srs_error("open hls muxer failed. ret=%d", ret);
            return ret;
        }
        
        /* set the segment muxer audio codec. */
        if (acodec != SrsCodecAudioReserved1) {
            current->muxer->update_acodec(acodec);
        }
        
        return ret;
    }
    

    3.1.8 SrsHlsSegment 构造

    /*
     * the wrapper of m3u8 segment from specification:
     * EXTINF: The EXTINF tag specifies the duration of a media segment.
     */
    SrsHlsSegment::SrsHlsSegment(SrsTsContext* c, bool write_cache, 
        bool write_file, SrsCodecAudio ac, SrsCodecVideo vc)
    {
        /* duration in seconds in m3u8. */
        duration = 0;
        /* sequence number in m3u8. */
        sequence_no = 0;
        /* current segment start dts for m3u8 */
        segment_start_dts = 0;
        /* whether current segement is sequence header. */
        is_sequence_header = false;
        /* 由前知 write_cache 为 false,write_file 为 true */
        writer = new SrsHlsCacheWriter(write_cache, write_file);
        muxer = new SrsTSMuxer(writer, c, ac, vc);
    }
    

    3.1.9 SrsHlsCacheWriter 构造

    /*
     * write to file and cache.
     */
    SrsHlsCacheWriter::SrsHlsCacheWriter(bool write_cache, bool write_file)
    {
        should_write_cache = write_cache;
        should_write_file = write_file;
    }
    

    3.1.10 SrsTSMuxer 构造

    /*
     * write data from frame(header info) and buffer(data) to ts file. 
     * it's a simple object wrapper for utility from nginx-rtmp: SrsMpegtsWriter
     */
    SrsTSMuxer::SrsTSMuxer(SrsFileWriter* w, SrsTsContext* c, SrsCodecAudio ac, 
        SrsCodecVideo vc)
    {
        writer = w;
        context = c;
    
        acodec = ac;
        vcodec = vc;
    }
    

    3.1.11 srs_path_build_timestamp

    /*
     * build the path according to timestamp, where replace variables:
     *       [2006], replace this const to current year.
     *       [01], replace this const to current month.
     *       [02], replace this const to current date.
     *       [15], replace this const to current hour.
     *       [04], repleace this const to current minute.
     *       [05], repleace this const to current second.
     *       [999], repleace this const to current millisecond.
     *       [timestamp],replace this const to current UNIX timestamp in ms.
     * @return the replace path.
     */
    string srs_path_build_timestamp(string template_path)
    {
        std::string path = template_path;
        
        /* data and time substitude
         * clock time */
        timeval tv;
        if (gettimeofday(&tv, NULL) == -1) {
            return path;
        }
        
        /* to calendar time */
        struct tm* tm;
        /* 若没有配置 utc_time,则默认返回 false,即不使用 utc 时间 */
        if (_srs_config->get_utc_time()) {
            if ((tm = gmtime(&tv.tv_sec)) == NULL) {
                return path;
            }
        } else {
            if ((tm = localtime(&tv.tv_sec)) == NULL) {
                return path;
            }
        }
        
        /* the buffer to format the date and time. */
        char buf[64];
        
        /* [2006], replace with current year. */
        if (true) {
            snprintf(buf, sizeof(buf), "%04d", 1900 + tm->tm_year);
            path = srs_string_replace(path, "[2006]", buf);
        } 
        /* [01], replace this const to current month. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", 1 + tm->tm_mon);
            path = srs_string_replace(path, "[01]", buf);
        }
        /* [02], replace this const to current date. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_mday);
            path = srs_string_replace(path, "[02]", buf);
        }
        /* [15], replace this const to current hour. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_hour);
            path = srs_string_replace(path, "[15]", buf);
        }
        /* [04], repleace this const to current minute. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_min);
            path = srs_string_replace(path, "[04]", buf);
        }
        /* [05], repleace this const to current second. */
        if (true) {
            snprintf(buf, sizeof(buf), "%02d", tm->tm_sec);
            path = srs_string_replace(path, "[05]", buf);
        }
        /* [999], repleace this const to current millisecond. */
        if (true) {
            snprintf(buf, sizeof(buf), "%03d", (int)(tv.tv_usec / 1000));
            path = srs_string_replace(path, "[999]", buf);
        }
        /* [timestamp],replace this const to current UNIX timestamp in ms. */
        if (true) {
            int64_t now_us = ((int64_t)tv.tv_sec) * 1000 * 1000 + (int64_t)tv.tv_usec;
            snprintf(buf, sizeof(buf), "%"PRId64, now_us / 1000);
            path = srs_string_replace(path, "[timestamp]", buf);
        }
        
        return path;
    }
    

    3.1.12 SrsTSMuxer::open

    /*
     * open the writer, donot write the PSI of ts.
     * @param p, a string indicates the path of ts file to mux to.
     */
    int SrsTSMuxer::open(string p)
    {
        int ret = ERROR_SUCCESS;
        
        path = p;
        
        close();
        
        /* reset the context for a new ts start. */
        context->reset();
        
        /* 调用子类 SrsHlsCacheWriter 实现的 open 函数 */
        if ((ret = writer->open(path)) != ERROR_SUCCESS) {
            return ret;
        }
        
        return ret;
    }
    

    3.1.13 SrsHlsCacheWriter::open

    int SrsHlsCacheWriter::open(string file)
    {
        if (!should_write_file) {
            return ERROR_SUCCESS;
        }
        
        return impl.open(file);
    }
    

    3.1.14 SrsFileWriter::open

    /*
     * open file writer, in truncate mode.
     * @param p, a string indicates the path of file to open.
     */
    int SrsFileWriter::open(string p)
    {
        int ret = ERROR_SUCCESS;
        
        if (fd > 0) {
            ret = ERROR_SYSTEM_FILE_ALREADY_OPENED;
            srs_error("file %s already opened. ret=%d", path.c_str(), ret);
            return ret;
        }
        
        int flags = O_CREAT|O_WRONLY|O_TRUNC;
        mode_t mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH;
    
        /* 打开 ./objs/nginx/html/live/livestream-0.ts.tmp 文件 */
        if ((fd = ::open(p.c_str(), flags, mode)) < 0) {
            ret = ERROR_SYSTEM_FILE_OPENE;
            srs_error("open file %s failed. ret=%d", p.c_str(), ret);
            return ret;
        }
        
        path = p;
        
        return ret;
    }
    

    上面分析完 acquire_publish 函数,下面接着分析 SrsPublishRecvThread 线程的构建。

    3.2 SrsPublishRecvThread 构造

    在该构造函数中,创建了一个可重复使用的 recv 线程,专门用于接收客户端推流的音视频数据。

    • the publish recv thread got message and callback the source method to process message.
    SrsPublishRecvThread::SrsPublishRecvThread(
        SrsRtmpServer* rtmp_sdk, 
        SrsRequest* _req, int mr_sock_fd, int timeout_ms, 
        SrsRtmpConn* conn, SrsSource* source, bool is_fmle, bool is_edge
    ): trd(this, rtmp_sdk, timeout_ms)
    {
        rtmp = rtmp_sdk;
    
        _conn = conn;
        /* the params for conn callback. */
        _source = source;
        /* 由前知,为 true */
        _is_fmle = is_fmle;
        /* false */
        _is_edge = is_edge;
    
        recv_error_code = ERROR_SUCCESS;
        /* the msgs already got. */
        _nb_msgs = 0;
        /* The video frames we got. */
        video_frames = 0;
        /* the error timeout cond */
        error = st_cond_new();
        ncid = cid = 0;
        
        req = _req;
        mr_fd = mr_sock_fd;
    
        /* the mr settings, 
         * @see https://github.com/ossrs/srs/issues/241 */
        /* 没有配置 mr,默认返回 false */
        mr = _srs_config->get_mr_enabled(req->vhost);
        /* 默认返回 350 */
        mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
        
        /* 没有配置 min_latency,默认返回 false */
        realtime = _srs_config->get_realtime_enabled(req->vhost);
        
        _srs_config->subscribe(this);
    }
    

    调用该 SrsPublishRecvThread 的构造函数前,先调用 SrsRecvThread 类的构造函数。

    3.2.1 SrsRecvThread 构造

    • the recv thread, use message handler to handle each received message.
    SrsRecvThread::SrsRecvThread(ISrsMessageHandler* msg_handler, 
        SrsRtmpServer* rtmp_sdk, int timeout_ms)
    {
        timeout = timeout_ms;
        handler = msg_handler;
        rtmp = rtmp_sdk;
        trd = new SrsReusableThread2("recv", this);
    }
    

    3.2.2 SrsReusableThread2 构造

    SrsReusableThread2::SrsReusableThread2(const char* n, ISrsReusableThread2Handler* h, 
        int64_t interval_us)
    {
        handler = h;
        pthread = new internal::SrsThread(n, this, interval_us, true);
    }
    

    3.2.3 SrsThread 构造

    SrsThread::SrsThread(const char* name, ISrsThreadHandler* thread_handler, 
        int64_t interval_us, bool joinable)
    {
        _name = name;
        handler = thread_handler;
        cycle_interval_us = interval_us;
        
        tid = NULL;
        loop = false;
        really_terminated = true;
        _cid = -1;
        _joinable = joinable;
        disposed = false;
        
        // in start(), the thread cycle method maybe stop and remove the thread itself,
        // and the thread start() is waiting for the _cid, and segment fault then.
        // @see https://github.com/ossrs/srs/issues/110
        // thread will set _cid, callback on_thread_start(), then wait for the can_run signal.
        can_run = false;
    }
    

    4. SrsRtmpConn::do_publishing

    int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd)
    {
        int ret = ERROR_SUCCESS;
        
        SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
        SrsAutoFree(SrsPithyPrint, pprint);
        
        /* start isolate recv thread. */
        if ((ret = trd->start()) != ERROR_SUCCESS) {
            srs_error("start isolate recv thread failed. ret=%d", ret);
            return ret;
        }
        
        /* change the isolate recv thread context id,
         * merge its log to current thread. */
        int receive_thread_cid = trd->get_cid();
        trd->set_cid(_srs_context->get_id());
        
        /* initialize the publish timeout. */
        publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
        publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
        
        /* set the sock options. */
        set_sock_options();
        
        if (true) {
            /* 没有启用 mr 功能,为 false */
            bool mr = _srs_config->get_mr_enabled(req->vhost);
            int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
        }
        
        int64_t nb_msgs = 0;
        uint64_t nb_frames = 0;
        while (!disposed) {
            pprint->elapse();
            
            /* when source is set to expired, disconnect it. */
            if (expired) {
                ret = ERROR_USER_DISCONNECT;
                srs_error("connection expired. ret=%d", ret);
                return ret;
            }
            
            /* conn wait for timeout */
            if (nb_msgs == 0) {
                /* when not got msgs, wait for a larger timeout. */
                trd->wait(publish_1stpkt_timeout);
            } else {
                trd->wait(publish_normal_timeout);
            }
            
            /* check the thread error code */
            if ((ret = trd->error_code()) != ERROR_SUCCESS) {
                if (!srs_is_system_control_error(ret) && 
                    !srs_is_client_gracefully_close(ret)) {
                    srs_error("recv thread failed. ret=%d", ret);
                }
                return ret;
            }
            
            /* when not any messages, timeout */
            if (trd->nb_msgs() <= nb_msgs) {
                ret = ERROR_SOCKET_TIMEOUT;
                srs_warn("publish timeout %dms, nb_msgs=%"PRId64", ret=%d",
                    nb_msgs? publish_normal_timeout : publish_1stpkt_timeout, nb_msgs, ret);
                break;
            }
            nb_msgs = trd->nb_msgs();
            
            /* update the stat for video fps. */
            SrsStatistic* stat = SrsStatistic::instance();
            if ((ret = stat->on_video_frames(req, (int)(trd->nb_video_frames() - nb_frames))) 
                != ERROR_SUCCESS) {
                return ret;
            }
            nb_frames = trd->nb_video_frames();
    
            /* reportable */
            if (pprint->can_print()) {
                kbps->sample();
                bool mr = _srs_config->get_mr_enabled(req->vhost);
                int mr_sleep = _srs_config->get_mr_sleep_ms(req->vhost);
                );
            }
        }
        
        return ret;
    }
    

    该 while 循环主要是循环进行 error 条件变量上进入休眠状态,等待该条件变量成立(即若 recv 线程发生错误,则会通过 error 条件变量唤醒在该条件变量上等待的线程,以便该线程进行处理 recv error),或者等待的超时时间到了才唤醒。

    5. recv 线程

    该线程具体分析可见: SRS之接收推流线程:recv

  • 相关阅读:
    C# 如何比较版本号大小
    C#中复制文件夹及文件的两种方法
    C#解压和压缩文件
    C# NPOI 导入与导出Excel文档 兼容xlsx, xls
    c#使用NPOI导出到excel
    C# 把DataGridView控件数据,转成DataTable
    C# 如何为 datagridview 中增加checkbox列
    Mixed Content: The page at 'xxx' was loaded over HTTPS, but requested an insecure resource 'xxx'.
    c# Winform DataGridView 当前单元格失去焦点的有关问题
    Docker的通俗解释
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9130634.html
Copyright © 2011-2022 走看看