zoukankan      html  css  js  c++  java
  • SRS之SrsRtmpConn::stream_service_cycle详解

    首先使用 obs 推流符合如下流程:参考自 Hanvision Makito X cann't publish to SRS. .

    FFMPEG:
    
    C/S: Handshake
    C: ConnectApp() tcUrl=xxx
    S: Ack Size 2500,000
    S: Set Peer Bandwidth 2500,000
    S: Set Chunk Size 60,000
    C: Set Chunk Size 60,000
    S: ConnectApp() _result
    S: onBWDone()
    
    C: releaseStream+FCPublish(s0)
    C: createStream()
    S: releaseStream _result
    C: _checkbw()
    S: FCPublish() _result
    S: createStream() _result
    C: publish(s0)
    S: onFCPublish()
    S: onStatus()
    

    下面的分析是继服务器发送 onBWDone 后,进入 while 循环开始执行 stream_service_cycle。

    1. SrsRtmpConn::stream_service_cycle

    int SrsRtmpConn::stream_service_cycle()
    {
        int ret = ERROR_SUCCESS;
        
        /* the rtmp client type: play/publish/unknown */
        SrsRtmpConnType type;
        /* 首先鉴别客户端请求的类型,是play/publish 或其他,还有播放/推流的流名称 */
        if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) 
            != ERROR_SUCCESS) {
            if (!srs_is_client_gracefully_close(ret)) {
                srs_error("identify client failed. ret=%d", ret);
            }
            return ret;
        }
        req->strip();
        srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f", 
            srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration);
        
        /* 只有当配置文件中使能了 security 配置项,才会真正进入到该 check 函数进行
         * 一系列的检测 */
        /* allow all if security disabled. */
        // secutity check
        if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {
            srs_error("security check failed. ret=%d", ret);
            return ret;
        }
        srs_info("security check ok");
        
        /* SRS 不允许请求的流名称为空 */
        // Never allow the empty stream name, for HLS may write to a file with empty name.
        // @see https://github.com/ossrs/srs/issues/834: 
        //      SRS2 crashed for TS encoder assert failed
        if (req->stream.empty()) {
            ret = ERROR_RTMP_STREAM_NAME_EMPTY;
            srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);
            return ret;
        }
        
        /* 设置服务器 send/recv 的超时时间 */
        // client is identified, set the timeout to service timeout.
        rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US);
        rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US);
        
        /* 首先根据 vhost/app/stream 构造一个 stream_url,然后根据该 stream_url 在 SrsSource::pool
         * 中查找是否存在一个 stream_url 对应的 SrsSource,若能找到,则直接返回该 SrsSource,否则,
         * 新构造一个 SrsSource,并将其按 stream_url 放到 SrsSource::pool map 容器中 */
        // find a source to serve.
        SrsSource* source = NULL;
        if ((ret = SrsSorce::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) {
            return ret;
        }
        srs_assert(source != NULL);
        
        /* 构造统计类,将统计当前的 vhost、stream 等信息 */
        // update the statistic when source disconveried.
        SrsStatistic* stat = SrsStatistic::instance();
        if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) 
        {
            srs_error("stat client failed. ret=%d", ret);
            return ret;
        }
        
        /* 若 vhost 中没有配置 mode,则返回 false */
        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);
        /* 默认开始 gop_cache */
        bool enabled_cache = _srs_config->get_gop_cache(req->vhost);
        srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", 
            req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge,
            source->source_id(), source->source_id());
        /* 根据 enabled_cache 设置是否启动 gop_cache,为 true,则启动 */
        source->set_cache(enabled_cache);
        
        /* 根据鉴别到的客户端的类型:play 或者 publish,开始进行相应的处理 */
        /* The type of client, play or publish. */
        client_type = type;
        switch (type) {
            case SrsRtmpConnPlay: {
                srs_verbose("start to play stream %s.", req->stream.c_str());
                
                // response connection start play
                if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
                    srs_error("start to play stream failed. ret=%d", ret);
                    return ret;
                }
                if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
                    srs_error("http hook on_play failed. ret=%d", ret);
                    return ret;
                }
                
                srs_info("start to play stream %s success", req->stream.c_str());
                ret = playing(source);
                http_hooks_on_stop();
                
                return ret;
            }
            /* 由前面知,若 obs 推流的话为该类型 */
            case SrsRtmpConnFMLEPublish: {
                srs_verbose("FMLE start to publish stream %s.", req->stream.c_str());
                
                /* 该函数主要是接收并响应一系列消息:
                 * C: FCPublish
                 * S: FCPublish response
                 * C: createStream
                 * S: createStream response
                 * C: publish
                 * S: publish response onFCPublish(NetStream.Publish.Start)
                 * S: publish response onStatus(NetStream.Publish.Start) */
                if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) {
                    srs_error("start to publish stream failed. ret=%d", ret);
                    return ret;
                }
                
                /* 服务器响应客户端的publish消息后,即开始进入接收客户端推流的
                 * metadata、video、audio等数据的处理 */
                return publishing(source);
            }
            case SrsRtmpConnHaivisionPublish: {
                srs_verbose("Haivision start to publish stream %s.", req->stream.c_str());
                
                if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) {
                    srs_error("start to publish stream failed. ret=%d", ret);
                    return ret;
                }
                
                return publishing(source);
            }
            case SrsRtmpConnFlashPublish: {
                srs_verbose("flash start to publish stream %s.", req->stream.c_str());
                
                if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) {
                    srs_error("flash start to publish stream failed. ret=%d", ret);
                    return ret;
                }
                
                return publishing(source);
            }
            default: {
                ret = ERROR_SYSTEM_CLIENT_INVALID;
                srs_info("invalid client type=%d. ret=%d", type, ret);
                return ret;
            }
        }
        
        return ret;
    }
    

    2. SrsRtmpServer::identify_client

    该函数是对客户端请求进行鉴定,以便做出相应的处理。

    int SrsRtmpServer::identify_client(int stream_id, SrsRtmpConnType& type, 
        string& stream_name, double& duration)
    {
        type = SrsRtmpConnUnknown;
        int ret = ERROR_SUCCESS;
        
        while (true) {
            SrsCommonMessage* msg = NULL;
            /* 接收一个完整的消息 */
            if ((ret = protocol->recv_message(&msg)) != ERROR_SUCCESS) {
                if (!srs_is_client_gracefully_close(ret)) {
                    srs_error("recv identify client message failed. ret=%d", ret);
                }
                return ret;
            }
            
            SrsAutoFree(SrsCommonMessage, msg);
            SrsMessageHeader& h = msg->header;
            
            if (h.is_ackledgement() || h.is_set_chunk_size() || 
                h.is_windonw_ackledgenemt_size() || h.is_user_control_message()) {
                continue;
            }
            
            /* 若不为 amf 类型的消息,则忽略该消息,继续接收下一个消息 */
            if (!h.is_amf0_commnad() && !h.is_amf3_command()) {
                srs_trace("identify ignore message except "
                    "AMF0/AMF3 command message. type=%#x", h.message_type);
                continue;
            }
            
            SrsPacket* pkt = NULL;
            /* 对接收到的 amf 命令消息进行解码,解码后的数据保存在 pkt 指向的子类中 */
            if ((ret = protocol->decode_message(msg, &pkt)) != ERROR_SUCCESS) {
                srs_error("identify decode message failed. ret=%d", ret);
                return ret;
            }
            
            SrsAutoFree(SrsPacket, pkt);
            
            /* 下面是通过 dynamic_cast 动态转换尝试将 pkt 转为指定的类型,
             * 若不为 NULL,则表明接收到的消息即为所要的消息 */
            if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
                srs_info("identify client by create stream, play or flash publish.");
                return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt), 
                    stream_id, type, stream_name, duration);
            }
            /* 当接收到的是 releaseStream/FCPublish/FCUnpublish 这三个中的一个时,
             * 构造的类都为 SrsFMLEStartPacket */
            if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
                srs_info("identify client by releaseStream, fmle publish");
                /* 这里即可确定 client 的类型为 publish  */
                return identify_fmle_publish_client(dynamic_cast<SrsFMLEStartPacket*>(pkt), 
                                                    type, stream_name);
            }
            if (dynamic_cast<SrsPlayPacket*>(pkt)) {
                srs_info("level0 identify client by play.");
                return identify_play_client(dynamic_cast<SrsPlayPacket*>(pkt), type, 
                                            stream_name, duration);
            }
            /* call msg,
             * support response null first,
             * @see https://github.com/ossrs/srs/issues/106
             * TODO: FIXME: response in right way, or forward in edge mode. */
            SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
            if (call) {
                SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
                res->command_object = SrsAmf0Any::null();
                res->response = SrsAmf0Any::null();
                if ((ret = protocol->send_and_free_packet(res, 0)) != ERROR_SUCCESS) {
                    if (!srs_is_system_control_error(ret) && 
                        !srs_is_client_gracefully_close(ret)) {
                        srs_warn("response call failed. ret=%d", ret);
                    }
                    return ret;
                }
                
                /* For encoder of Haivision, it always send a _checkbe call message.
                 * @remark the next message is createStream, so we continue to identify it.
                 * @see https://github.com/ossrs/srs/issues/844 */
                if (call->command_name == "_checkbw") {
                    srs_info("Havision encoder identified.");
                    continue;
                }
                continue;
            }
            
            srs_trace("ignore AMF0/AMF3 command message.");
        }
        
        return ret;
    }
    

    2.1 SrsProtocol::recv_message

    int SrsProtocol::recv_message(SrsCommonMessage* pmsg) 
    {
        *pmsg = NULL;
        
        int ret = ERROR_SUCCESS;
        
        while (true) {
            SrsCommonMessage* msg = NULL;
            
            /* 从 socket 中读取一个消息 */
            if ((ret = recv_interlaced_message(&msg)) != ERROR_SUCCESS) {
                if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                    srs_error("recv interlaced message failed. ret=%d", ret);
                }
                srs_freep(msg);
                return ret;
            }
            srs_verbose("entire msg received");
            
            if (!msg) {
                srs_info("got empty message without error.");
                continue;
            }
            
            if (msg->size <= 0 || msg->header.payload_length <= 0) {
                srs_trace("ignore empty message(type=%d, size=%d, time=%"PRId64", sid=%d).",
                    msg->header.message_type, msg->header.payload_length,
                    msg->header.timestamp, msg->header.stream_id);
                srs_freep(msg);
                continue;
            }
            
            /* 若接收到的是一些control消息,如 set chunk size 等,则更改上下文信息,
             * 其他的消息如音视频或amf类型的则不做处理 */
            if ((ret = on_recv_message(msg)) != ERROR_SUCCESS) {
                srs_error("hook the received msg failed. ret=%d", ret);
                srs_freep(msg);
                return ret;
            }
            
            srs_verbose("got a msg, cid=%d, type=%d, size=%d, time=%"PRId64, 
                msg->header.perfer_cid, msg->header.message_type, msg->header.payload_length, 
                msg->header.timestamp);
            *pmsg = msg;
            break;
        }
        
        return ret;
    }
    

    2.1.1 SrsProtocol::recv_interlaced_message

    int SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
    {
        int ret = ERROR_SUCCESS;
        
        // chunk stream basic header.
        char fmt = 0;
        int cid = 0;
        /* 读取 chunk 的基本头 */
        if ((ret = read_basic_header(fmt, cid)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("read basic header failed. ret=%d", ret);
            }
            return ret;
        }
        srs_verbose("read basic header success. fmt=%d, cid=%d", fmt, cid);
        
        // thd cid must not negative.
        srs_assert(cid >= 0);
        
        /* 构造一个块流缓存,因为一个 RTMP 消息可能包含多个块,
         * 因此这里使用该块流缓存将多个块的信息保存起来,直到
         * 接收到一个完整的消息为止 */
        // get the cached chunk stream.
        SrsChunkStream* chunk = NULL;
        
        /* SrsChunkStream** cs_cache: 
         * SrsProtocol 类在构造的时候就为 cs_cache 数组分配了 SRS_PERF_CHUNK_STREAM_CACHE(16)
         * 个 SrsChunkStream 元素空间,当 chunk 的 cid 不超过该数组最大值时,可以直接从该数组中
         * 取出一个项,即 SrsChunkStream 使用 */
        // use chunk stream cache to get the chunk info.
        // @see https://github.com/ossrs/srs/issues/249
        if (cid < SRS_PERF_CHUNK_STREAM_CACHE) {
            // chunk stream cache hit.
            srs_verbose("cs-cache hit, cid=%d", cid);
            // already init, use it directly
            chunk = cs_cache[cid];
            srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                        "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
        } else {
            // chunk stream cache miss, use map.
            if (chunk_streams.find(cid) == chunk_streams.end()) {
                chunk = chunk_streams[cid] = new SrsChunkStream(cid);
                // set the perfer cid of chunk,
                // which will copy to the message received.
                chunk->header.perfer_cid = cid;
                srs_verbose("cache new chunk stream: fmt=%d, cid=%d", fmt, cid);
            } else {
                chunk = chunk_streams[cid];
                srs_verbose("cached chunk stream: fmt=%d, cid=%d, size=%d, "
                            "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                    chunk->fmt, chunk->cid, (chunk->msg? chunk->msg->size : 0), 
                    chunk->header.message_type, chunk->header.payload_length,
                    chunk->header.timestamp, chunk->header.stream_id);
            }
        }
        
        /* 根据 fmt 接收 chunk 的消息头 */
        // chunk stream message header
        if ((ret = read_message_header(chunk, fmt)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("read message header failed. ret=%d", ret);
            }
            return ret;
        }
        srs_verbose("read message header success. "
                "fmt=%d, ext_time=%d, size=%d, "
                "message(type=%d, size=%d, time=%"PRId64", sid=%d)", 
                fmt, chunk->extended_timestamp, (chunk->msg? chunk->msg->size : 0), 
                chunk->header.message_type, chunk->header.payload_length, 
                chunk->header.timestamp, chunk->header.stream_id);
        
        // read msg payload from chunk stream.
        SrsCommonMessage* msg = NULL;
        if ((ret = read_message_payload(chunk, &msg)) != ERROR_SUCCESS) {
            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {
                srs_error("read message payload failed. ret=%d", ret);
            }
            return ret;
        }
        
        // not got an entire RTMP message, try next chunk.
        if (!msg) {
            srs_verbose("get partial message success. size=%d, "
                        "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                    (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
                    chunk->header.message_type, chunk->header.payload_length,
                    chunk->header.timestamp, chunk->header.stream_id);
            return ret;
        }
        
        *pmsg = msg;
        srs_info("get entire message success. size=%d, "
                 "message(type=%d, size=%d, time=%"PRId64", sid=%d)",
                (msg? msg->size : (chunk->msg? chunk->msg->size : 0)), 
                chunk->header.message_type, chunk->header.payload_length,
                chunk->header.timestamp, chunk->header.stream_id);
                
        return ret;
    }
    

    2.2 SrsProtocol::decode_message

    int SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
    {
        *ppacket = NULL;
        
        int ret = ERROR_SUCCESS;
        
        srs_assert(msg != NULL);
        srs_assert(msg->payload != NULL);
        srs_assert(msg->size > 0);
        
        SrsStream stream;
        
        /* 将消息负载的数据使用 SrsStream 类进行初始化 */
        // initialize the decode stream for all message,
        // it's ok for the initialize if fase and without memory copy.
        if ((ret = stream.initialize(msg->payload, msg->size)) != ERROR_SUCCESS) {
            srs_error("initialize stream failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("decode stream initialized success");
        
        // decode the packet.
        SrsPacket* packet = NULL;
        if ((ret = do_decode_message(msg->header, &stream, &packet)) != ERROR_SUCCESS) {
            srs_freep(packet);
            return ret;
        }
        
        // set to output ppacket only when success.
        *ppacket = packet;
        
        return ret;
    }
    

    2.2.1 SrsProtocol::do_decode_message

    int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, 
        SrsPacket** packet)
    {
        int ret = ERROR_SUCCESS;
        
        SrsPacket* packet = NULL;
        
        // decode specified packet type
        if (header.is_amf0_command() || header.is_amf3_command() 
            || header.is_amf0_data() || header.is_amf3_data()) {
            srs_verbose("start to decode AMF0/AMF3 command message.");
            
            // skip 1bytes to decode the amf3 command.
            if (header.is_amf3_command) && stream->require(1)) {
                srs_verbose("skip 1bytes to decode AMF3 command");
                stream->skip(1);
            }
            
            // amf0 command message.
            // need to read the command name.
            std::string command;
            /* 读取消息的命令名 */
            if ((ret = srs_amfo_read_string(stream, command)) != ERROR_SUCCESS) {
                srs_error("decode AMF0/AMF3 command name failed. ret=%d", ret);
                return ret;
            }
            srs_verbose("AMF0/AMF3 command message, command_name=%s", command.c_str());
            
            // result/error packet
            if (command == RTMP_AMF0_COMMAND_RESULT || command == RTMP_AMF0_COMMAND_ERROR) {
                double transactionId = 0.0;
                if ((ret = srs_amf0_read_number(stream, transactionId)) != ERROR_SUCCESS) {
                    srs_error("decode AMF0/AMF3 transcationId failed. ret=%d", ret);
                    return ret;
                }
                srs_verbose("AMF0/AMF3 command id, transcationId=%.2f", transactionId);
                
                // reset stream, for header read completed.
                stream->skip(-1 * stream->pos());
                if (header.is_amf3_command()) {
                    stream->skip(1);
                }
                
                // find the call name
                if (requests.find(transactionId) == requests.end()) {
                    ret = ERROR_RTMP_NO_REQUEST;
                    srs_error("decode AMF0/AMF3 request failed. ret=%d", ret);
                    return ret;
                }
                
                std::string request_name = requests[transactionId];
                srs_verbose("AMF0/AMF3 request parsed. request_name=%s", request_name.c_str());
                
                if (request_name == RTMP_AMF0_COMMAND_CONNECT) {
                    srs_info("decode the AMF0/AMF3 response command(%s message).", 
                             request_name.c_str());
                    *ppacket = packet = new SrsConnectAppResPacket();
                    return packet->decode(stream);
                } else if (request_name == RTMP_AMF0_COMMAND_CREATE_STREAM) {
                    srs_info("decode the AMF0/AMF3 response command(%s message).", 
                             request_name.c_str());
                    *ppacket = packet = new SrsCreateStreamResPacket(0, 0);
                    return packet->decode(stream);
                } else if (request_name == RTMP_AMF0_COMMAND_RELEASE_STREAM
                    || request_name == RTMP_AMF0_COMMAND_FC_PUBLISH
                    || request_name == RTMP_AMF0_COMMAND_UNPUBLISH) {
                    srs_info("decode the AMF0/AMF3 response command(%s message).", 
                             request_name.c_str());
                    *ppacket = packet = new SrsFMLEStartResPacket(0);
                    return packet->decode(stream);
                } else {
                    ret = ERROR_RTMP_NO_REQUEST;
                    srs_error("decode AMF0/AMF3 request failed. "
                        "request_name=%s, transactionId=%.2f, ret=%d", 
                        request_name.c_str(), transactionId, ret);
                    return ret;
                }
            }
            
            // reset to zero(amf3 to 1) to restart decode.
            stream->skip(-1 * stream->pos());
            if (header.is_amf3_command()) {
                stream->skip(1);
            }
            
            /* 根据消息的命令名来构造对应的类,然后进行解码 */
            // decode command object.
            if (command == RTMP_AMF0_COMMAND_CONNECT) {
                srs_info("decode the AMF0/AMF3 command(connect vhost/app message).");
                *ppacket = packet = new SrsConnectAppPacket();
                return packet->decode(stream);
            } else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
                srs_info("decode the AMF0/AMF3 command(createStream message).");
                *ppacket = packet = new SrsCreateStreamPacket();
                return packet->decode(stream);
            } else if (command == RTMP_AMF0_COMMAND_PLAY) {
                srs_info("decode the AMF0/AMF3 command(paly message).");
                *ppacket = packet = new SrsPlayPacket();
                return packet->decode(stream);
            } else if(command == RTMP_AMF0_COMMAND_PAUSE) {
                srs_info("decode the AMF0/AMF3 command(pause message).");
                *ppacket = packet = new SrsPausePacket();
                return packet->decode(stream);
            } else if(command == RTMP_AMF0_COMMAND_RELEASE_STREAM) {
                srs_info("decode the AMF0/AMF3 command(FMLE releaseStream message).");
                *ppacket = packet = new SrsFMLEStartPacket();
                return packet->decode(stream);
            } else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
                srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
                *ppacket = packet = new SrsFMLEStartPacket();
                return packet->decode(stream);
            } else if(command == RTMP_AMF0_COMMAND_PUBLISH) {
                srs_info("decode the AMF0/AMF3 command(publish message).");
                *ppacket = packet = new SrsPublishPacket();
                return packet->decode(stream);
            } else if(command == RTMP_AMF0_COMMAND_UNPUBLISH) {
                srs_info("decode the AMF0/AMF3 command(unpublish message).");
                *ppacket = packet = new SrsFMLEStartPacket();
                return packet->decode(stream);
            } else if(command == SRS_CONSTS_RTMP_SET_DATAFRAME || 
                      command == SRS_CONSTS_RTMP_ON_METADATA) {
                srs_info("decode the AMF0/AMF3 data(onMetaData message).");
                *ppacket = packet = new SrsOnMetaDataPacket();
                return packet->decode(stream);
            } else if(command == SRS_BW_CHECK_FINISHED
                || command == SRS_BW_CHECK_PLAYING
                || command == SRS_BW_CHECK_PUBLISHING
                || command == SRS_BW_CHECK_STARTING_PLAY
                || command == SRS_BW_CHECK_STARTING_PUBLISH
                || command == SRS_BW_CHECK_START_PLAY
                || command == SRS_BW_CHECK_START_PUBLISH
                || command == SRS_BW_CHECK_STOPPED_PLAY
                || command == SRS_BW_CHECK_STOP_PLAY
                || command == SRS_BW_CHECK_STOP_PUBLISH
                || command == SRS_BW_CHECK_STOPPED_PUBLISH
                || command == SRS_BW_CHECK_FINAL)
            {
                srs_info("decode the AMF0/AMF3 band width check message.");
                *ppacket = packet = new SrsBandwidthPacket();
                return packet->decode(stream);
            } else if (command == RTMP_AMF0_COMMAND_CLOSE_STREAM) {
                srs_info("decode the AMF0/AMF3 closeStream message.");
                *ppacket = packet = new SrsCloseStreamPacket();
                return packet->decode(stream);
            } else if (header.is_amf0_command() || header.is_amf3_command()) {
                srs_info("decode the AMF0/AMF3 call message.");
                *ppacket = packet = new SrsCallPacket();
                return packet->decode(stream);
            }
            
            // default packet to drop message.
            srs_info("drop the AMF0/AMF3 command message, command_name=%s", command.c_str());
            *ppacket = packet = new SrsPacket();
            return ret;
        } else if (header.is_user_control_message()) {
            srs_verbose("start to decode user control message.");
            *ppacket = packet = new SrsUserControlPacket();
            return packet->decode(stream);
        } else if(header.is_window_ackledgement_size()) {
            srs_verbose("start to decode set ack window size message.");
            *ppacket = packet = new SrsSetWindowAckSizePacket();
            return packet->decode(stream);
        } else if(header.is_set_chunk_size()) {
            srs_verbose("start to decode set chunk size message.");
            *ppacket = packet = new SrsSetChunkSizePacket();
            return packet->decode(stream);
        } else {
            if (!header.is_set_peer_bandwidth() && !header.is_ackledgement()) {
                srs_trace("drop unknown message, type=%d", header.message_type);
            }
        }
        
        return ret;
    }
    

    由开始的流程知,服务器发送 onBWDone 后,接下来会接收到客户端的 releaseStream 命令。对于 releaseStream/PublishStream/FCPublish/FCUnpublish 等都是使用 SrsFMLEStartPacket 类来构造的。

    2.2.2 构造 SrsFMLEStartPacket 类

    /**
     * FMLE start publish: ReleaseStream/PublishStream/FCPublish/FCUnpublish
     */
    SrsFMLEStartPacket::SrsFMLEStartPacket()
    {
        /* 命令名:releaseStream */
        command_name = RTMP_AMF0_COMMAND_RELEASE_STREAM;
        /* the transaction ID to get the response. */
        transaction_id = 0;
        /**
         * If there exists any command info this is set, else this is set to null type.
         * @remark, never be NULL, an AMF0 null instance.
         */
        command_object = SrsAmf0Any::null();
    }
    

    2.2.3 SrsFMLEStartPacket::decode

    开始解析 releaseStream 消息的负载。

    int SrsFMLEStartPacket::decode(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        /* 读取该消息的命令名称 */
        if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
            return ret;
        }
        if (command_name.empty()
            || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM 
            && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
            && command_name != RTMP_AMF0_COMMAND_UNPUBLISH))
        {
            ret = ERROR_RTMP_AMF0_DECODE;
            srs_error("amf0 decode FMLE start command_name failed. "
                "command_name=%s, ret=%d", command_name.c_str(), ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
            return ret;
        }
        
        srs_info("amf0 decode FMLE start packet success");
        
        return ret;
    }
    

    recv:releaseStream('live')

    2.3 SrsRtmpServer::identify_fmle_publish_client

    当解析接收到的消息为 releaseStream 的时候,会调用该函数。

    int SrsRtmpServer::identify_fmle_publish_client(SrsFMLEStartPacket* req, 
        SrsRtmpConnType& type, string& stream_name)
    {
        int ret = ERROR_SUCCESS;
        
        /* 鉴别 client 的类型为 publish */
        type = SrsRtmpConnFMLEPublish;
        /* 客户端 publish 的流名称 */
        stream_name = req->stream_name;
        
        /* 下面是对 releaseStream 消息的响应 */
        // releaseStream response
        if (true) {
            SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(req->transaction_id);
            if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
                srs_error("send releaseStream response message failed. ret=%d", ret);
                return ret;
            }
            srs_info("send releaseStream response message success.");
        }
        
        return ret;
    }
    

    该函数中是对 releaseStream 的响应。 发送的包如下图:

    send: response for releaseStream

    2.3.1 构造 SrsFMLEStartResPacket 类

    /**
     * response for SrsFMLEStartPacket.
     */
    SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
    {
        /* 响应消息的名称:_result */
        command_name = RTMP_AMF0_COMMAND_RESULT;
        transaction_id = _transaction_id;
        /**
         * If there exists any command info this is set, else this is set to null type.
         * @remark, never be NULL, an AMF0 null instance.
         */
        command_object = SrsAmf0Any::null();
        /**
         * the optional args, set to undefined.
         * @remark, never be NULL, an AMF0 undefined instance.
         */
        args = SrsAmf0Any::undefined();
    }
    

    2.3.2 SrsFMLEStartResPacket::encode_packet

    构建 releaseStream response 消息的负载.

    int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("encode command_name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_name success.");
        
        if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("encode transaction_id failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode transaction_id success.");
        
        if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
            srs_error("encode command_object failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_object success.");
        
        if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
            srs_error("encode args failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode args success.");
        
        srs_info("encode FMLE start response packet success.");
        
        return ret;
    }
    

    3. SrsSource::fetch_or_create

    /**
     * create source when fetch from cache failed.
     * @param r the client request.
     * @param h the event handler for source.
     * @param pps the matches source, if success never be NULL.
     */
    int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
    {
        int ret = ERROR_SUCCESS;
        
        SrsSource* source = NULL;
        /* 根据 vhost/app/stream 组成的 stream_url 在 pool 中查找是否存在
         * 该与之对应的 SrsSource,若不存在,则返回 NULL */
        if ((source = fetch(r)) != NULL) {
            *pps = source;
            return ret;
        }
        
        /* vhost/app/stream 生成一个 stream_url */
        string stream_url = r->get_stream_url();
        string vhost = r->vhost;
        
        /* 必须确定在 pool 中没有 stream_url 对应的项 */
        // should always not exists for create a source.
        srs_assert(pool.find(stream_url) == pool.end());
        
        /* 构建一个新的 SrsSource */
        source = new SrsSource();
        if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
            srs_freep(source);
            return ret;
        }
        
        /* 将该新生成的 source 放入到 pool 中 */
        pool[stream_url] = source;
        srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());
        
        /* 通过 pps 返回该新生成的 source */
        *pps = source;
        
        return ret;
    }
    

    3.1 SrsSource::fetch

    /** 
     * get the exists source, NULL when not exists.
     * update the request and return the exists source.
     */
    SrsSource* SrsSource::fetch(SrsRequest* r)
    {
        SrsSource* source = NULL;
        
        /* get the stream identify, vhost/app/stream. */
        string stream_url = r->get_stream_url();
        if (pool.find(stream_url) == pool.end()) {
            return NULL;
        }
        
        source = pool[stream_url];
        
        /* we always update the request of resource,
         * for origin auth is on, the token in request maybe invalid,
         * and we only need to update the token of request, it's simple. */
        source->_req->update_auth(r);
        
        return source;
    }
    

    3.2 构造 SrsSource 类

    构造一个直播流源。

    /**
     * the time jitter algorithm:
     * 1. full, to ensure stream start at zero, and ensure stream monotonically increasing.
     * 2. zero, only ensure stream start at zero, ignore timestamp jitter.
     * 3. off, disable the time jitter algorithm, like atc.
     */
    enum SrsRtmpJitterAlgorithm
    {
        SrsRtmpJitterAlgorithmFULL = 0x01,
        SrsRtmpJitterAlgorithmZERO,
        SrsRtmpJitterAlgorithmOFF
    };
    
    SrsSource::SrsSource()
    {
        /* _req: deep copy of client request. */
        _req = NULL;
        /* the time jitter algorithm for vhost: vhost 的时间抖动算法 */
        jitter_algorithm = SrsRtmpJitterAlgorithmOFF;
        /* whether use interlaced/mixed algorithm to correct timestamp. 
         * 这里初始化禁止 */
        mix_correct = false;
        mix_queue = new SrsMixQueue();
        
    #ifdef SRS_AUTO_HLS
        /* 构造一个 hls handler */
        hls = new SrsHls();
    #endif
    #ifdef SRS_AUTO_DVR
        /* 构造一个 dvr handler */
        dvr = new SrsDvr();
    #endif
    #ifdef SRS_AUTO_TRANSCODE
        /* 构造一个 transcoding handler */
        encoder = new SrsEncoder();
    #endif
    #ifdef SRS_AUTO_HDS
        hds = new SrsHds(this);
    #endif
        
        /**
         * cache_sh_video: the cached video sequence header.
         * cache_sh_audio: the cached audio sequence header.
         */
        cache_metadata = cache_sh_video = cache_sh_audio = NULL;
        
        /* can publish, true when is not streaming */
        _can_publish = true;
        /**
         * source id,
         * for publish, it's the publish client id.
         * for edge, it's the edge ingest id.
         * when source id changed, for example, the edge reconnect,
         * invoke the on_source_id_changed() to let all clients know.
         *
         * _pre_source_id: previous source id.
         */
        _pre_source_id = _source_id = -1;
        /**
         * last die time, when all consumeers quit and no publisher,
         * we will remove the source when source die.
         */
        die_at = -1;
        
        /* edge control service */
        play_edge = new SrsPlayEdge();
        publish_edge = new SrsPublishEdge();
        /* gop cache for client fast startup. */
        gop_cache = new SrsGopCache();
        /* for aggregate message */
        aggregate_stream = new SrsStream();
        
        /* whether stream is monotonically increase. */
        is_monotonicaly_increase = false;
        last_packet_time = 0;
        
        _srs_config->subscribe(this);
        /**
         * atc whether atc(use absolute time and donot adjust time),
         * directly use msg time and donot adjust if atc is true,
         * otherwise, adjust msg time to start from 0 to make flash happy.
         *
         * TODO: FIXME: to support reload atc.
         */
        atc = false;
    }
    

    3.3 SrsSource::initialize

    int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
    {
        int ret = ERROR_SUCCESS;
        
        srs_assert(h);
        srs_assert(!_req);
        
        handler = h;
        /* 深度拷贝,将 r 中的内容完全拷贝到 _req 中 */
        _req = r->copy();
        /* 若 vhost 中没有设置 atc 配置项,则返回 false,
         * atc 为 false,则会调整 msg 时间从 0 开始 */
        atc = _srs_config->get_atc(_req->vhost);
        
        /* 暂不分析 */
    #ifdef SRS_AUTO_HLS
        if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
            return ret;
        }
    #endif
    
    #ifdef SRS_AUTO_DVR
        if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
    #endif
    
        if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
        if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
            return ret;
        }
        
        /* 若 vhost 没有设置 queue_length,则使用默认的,为 30 */
        double queue_size = _srs_config->get_queue_length(_req->vhost);
        publish_edge->set_queue_size(queue_size);
        
        jitter_algorithm = (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
        mix_correct = _srs_config->get_mix_correct(_req->vhost);
        
        return ret;
        
    }
    

    3.4 SrsStatistic::on_client

    /**
     * when got a client to publish/play stream,
     * @param id, the client srs id.
     * @param req, the client request object.
     * @param conn, the physical abstract connection object.
     * @param type, the type of connection.
     */
    int SrsStatistic::on_client(int id, SrsRequest* req, SrsConnection* conn, 
        SrsRtmpConnType type)
    {
        int ret = ERROR_SUCCESS;
        
        SrsStatisticVhost* vhost = create_vhost(req);
        SrsStatisticStream* stream = create_stream(vhost, req);
        
        // create client if not exists
        SrsStatisticClient* client = NULL;
        if (clients.find(id) == clients.end()) {
            client = new SrsStatisticClient();
            client->id = id;
            client->stream = stream;
            clients[id] = client;
        } else {
            client = clients[id];
        }
        
        // got client
        client->conn = conn;
        client->req = req;
        client->type = type;
        stream->nb_clients++;
        vhost->nb_clients++;
        
        return ret;
    }
    

    3.4.1 SrsStatistic::create_vhost

    SrsStatisticVhost* SrsStatistic::create_vhost(SrsRequest* req)
    {
        SrsStatisticVhost* vhost = NULL;
        
        /**
         * rvhost: 
         * key: vhost url, value: vhost Object.
         * @remark a fast index for vhost.
         */
        // create vhost if not exists.
        if (rvhosts.find(req->vhost) == rvhost.end()) {
            vhost = new SrsStatisticVhost();
            vhost->vhost = req->vhost;
            rvhosts[req->vhost] = vhost;
            /* vhosts - key: vhost id, value: vhost object. */
            vhosts[vhost->id] = vhost;
            return vhost;
        }
        
        vhost = rvhosts[req->vhost];
        
        return vhost;
    }
    

    3.4.2 SrsStatistic::create_stream

    SrsStatisticStream* SrsStatistic::create_stream(SrsStatisticVhost* vhost, SrsRequest* req)
    {
        std::string url = req->get_stream_url();
        
        SrsStatisticStream* stream = NULL;
        
        // create stream if not exists.
        if (rstreams.find(url) == rstreams.end()) {
            stream = new SrsStatisticStream();
            stream->vhost = vhost;
            stream->stream = req->stream;
            stream->app = req->app;
            stream->url = url;
            rstreams[url] = stream;
            streams[stream->id] = stream;
            return stream;
        }
        
        stream = rstreams[url];
        
        return stream;
    }
    

    4. SrsSource::set_cache

    void SrsSource::set_cache(bool enabled)
    {
        /* SrsGopCache* gop_cache: gop cache for client fast startup. */
        gop_cache->set(enabled);
    }
    

    4.1 SrsGopCache::set

    /**
     * to enabled or disable the gop cache.
     */
    void SrsGopCache::set(bool enabled)
    {
        /* if disabled the gop cache, the client will wait for the next 
         * keyframe for h264, and will be black-screan. */
        enabled_gop_cache = enabled;
        
        if (!enabled) {
            srs_info("disable gop cache, clear %d packets.", (int)gop_cache.size());
            clear();
            return;
        }
        
        srs_info("enable gop cache");
    }
    

    5. SrsRtmpServer::start_fmle_publish

    /**
     * when client type is publish, response with packets:
     * releaseStream response
     * FCPublish
     * FCPublish response
     * createStream response
     * onFCPublish(NetStream.Publish.Start)
     * onStatus(NetStream.Publish.Start)
     */
    int SrsRtmpServer::start_fmle_publish(int stream_id)
    {
        int ret = ERROR_SUCCESS;
        
        // FCPublish
        double fc_publish_tid = 0;
        if (true) {
            SrsCommonMessage* msg = NULL;
            SrsFMLEStartPacket* pkt = NULL;
            /* 指定接收这几个中的一个消息:ReleaseStream/FCPublish/FCUnpublish,若不是其中之一,
             * 则丢弃,直到接收到其中一个才返回 
             * 由开始的流程知,这里应该是接收 FCPublish */
            if ((ret = expect_message<SrsFMLEStartPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
                srs_error("recv FCPublish message failed. ret=%d", ret);
                return ret;
            }
            srs_info("recv FCPublish request message success.");
            
            SrsAutoFree(SrsCommonMessage, msg);
            SrsAutoFree(SrsFMLEStartPacket, pkt);
            
            fc_publish_tid = pkt->transaction_id;
        }
        // FCPublish response
        if (true) {
            SrsFMLEStartResPacket* pkt = new SrsFMLEStartResPacket(fc_publish_tid);
            if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
                srs_error("send FCPublish response message failed. ret=%d", ret);
                return ret;
            }
            srs_info("send FCPublish response message success.");
        }
        
        // createStream
        double create_stream_tid = 0;
        if (true) {
            SrsCommonMessage* msg = NULL;
            SrsCreateStreamPacket* pkt = NULL;
            if ((ret = expect_message<SrsCreateStreamPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
                srs_error("recv createStream message failed. ret=%d", ret);
                return ret;
            }
            srs_info("recv createStream request message success.");
            
            SrsAutoFree(SrsCommonMessage, msg);
            SrsAutoFree(SrsCreateStreamPacket, pkt);
            
            create_stream_tid = pkt->transaction_id;
        }
        // createStream response
        if (true) {
            SrsCreateStreamResPacket* pkt = new SrsCreateStreamResPacket(create_stream_tid, 
                                                                         stream_id);
            if ((ret = protocol->send_and_free_packet(pkt, 0)) != ERROR_SUCCESS) {
                srs_error("send createStream response message failed. ret=%d", ret);
                return ret;
            }
            srs_info("send createStream response message success.");
        }
        
        // publish
        if (true) {
            SrsCommonMessage* msg = NULL;
            SrsPublishPacket* pkt = NULL;
            if ((ret = expect_message<SrsPublishPacket>(&msg, &pkt)) != ERROR_SUCCESS) {
                srs_error("recv publish message failed. ret=%d", ret);
                return ret;
            }
            srs_info("recv publish request message success.");
            
            SrsAutoFree(SrsCommonMessage, msg);
            SrsAutoFree(SrsPublishPacket, pkt);
        }
        // publish response onFCPublish(NetStream.Publish.Start)
        if (true) {
            SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
            
            pkt->command_name = RTMP_AMF0_COMMAND_ON_FC_PUBLISH;
            pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
            pkt->data->set(StatusDescription, SrsAmf0Any::str("Started publishing stream."));
            
            if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
                srs_error("send onFCPublish(NetStream.Publish.Start) message failed. ret=%d", 
                          ret);
                return ret;
            }
            srs_info("send onFCPublish(NetStream.Publish.Start) message success.");
        }
        // publish response onStatus(NetStream.Publish.Start)
        if (true) {
            SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
            
            pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
            pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodePublishStart));
            pkt->data->set(StatusDescritption, SrsAmf0Any::str("Started publishing stream."));
            pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
            
            if ((ret = protocol->send_and_free_packet(pkt, stream_id)) != ERROR_SUCCESS) {
                srs_error("send onStatus(NetStream.Publish.Start) message failed. ret=%d", 
                          ret);
                return ret;
            }
            srs_info("send onStatus(NetStream.Publish.Start) message success.");
        }
        
        srs_info("FMLE publish success.");
        
        return ret;
    }
    

    5.1 FCPublish

    5.1.1 FCPublish 接收

    接收 FCPublish 后的解析如下代码所示。

    SrsProtocol::do_decode_message:

    int SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsStream* stream, 
        SrsPacket** ppacket)
    {
        ...
        
        /* FCPublish */
        else if(command == RTMP_AMF0_COMMAND_FC_PUBLISH) {
            srs_info("decode the AMF0/AMF3 command(FMLE FCPublish message).");
            *ppacket = packet = new SrsFMLEStartPacket();
            return packet->decode(stream);
        } 
        
        ...
    }
    

    SrsFMLEStartPacket::decode

    int SrsFMLEStartPacket::decode(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        /* 读取消息的命令名,即 "FCPublish" */
        if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start command_name failed. ret=%d", ret);
            return ret;
        }
        if (command_name.empty() 
            || (command_name != RTMP_AMF0_COMMAND_RELEASE_STREAM 
            && command_name != RTMP_AMF0_COMMAND_FC_PUBLISH
            && command_name != RTMP_AMF0_COMMAND_UNPUBLISH)
        ) {
            ret = ERROR_RTMP_AMF0_DECODE;
            srs_error("amf0 decode FMLE start command_name failed. "
                "command_name=%s, ret=%d", command_name.c_str(), ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start transaction_id failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start command_object failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode FMLE start stream_name failed. ret=%d", ret);
            return ret;
        }
        
        srs_info("amf0 decode FMLE start packet success");
        
        return ret;
    }
    

    5.1.2 FCPublish response

    FCPublish 的响应用 SrsFMLEStartResPacket 类构造数据。该类的构造如下:

    SrsFMLEStartResPacket 构造函数

    /**
     * response for SrsFMLEStartPacket.
     */
    SrsFMLEStartResPacket::SrsFMLEStartResPacket(double _transaction_id)
    {
        /* _result */
        command_name = RTMP_AMF0_COMMAND_RESULT;
        transaction_id = _transaction_id;
        command_object = SrsAmf0Any::null();
        args = SrsAmf0Any::undefined();
    }
    

    SrsFMLEStartResPacket::encode_packet

    /**
     * subpacket can override to encode the payload to stream.
     * @remark never invoke the super.encode_packet, it always failed.
     */
    int SrsFMLEStartResPacket::encode_packet(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("encode command_name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_name success.");
        
        if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("encode transaction_id failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode transaction_id success.");
        
        if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
            srs_error("encode command_object failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_object success.");
        
        if ((ret = srs_amf0_write_undefined(stream)) != ERROR_SUCCESS) {
            srs_error("encode args failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode args success.");
        
        
        srs_info("encode FMLE start response packet success.");
        
        return ret;
    }
    

    send: FCPublish response

    5.2 createStream

    5.2.1 createStream 接收

    createStream 消息的代表类为 SrsCreateStreamPacket,该类的构造如下。

    SrsCreateStreamPacket 构造函数

    /**
     * createStream
     * The client sends this command to the server to create a logical
     * channel for message communication The publishing of audio, video, and
     * metadata is carried out over stream channel created using the 
     * createStream command.
     */
    SrsCreateStreamPacket::SrsCreateStreamPacket()
    {
        /* createStream */
        command_name = RTMP_AMF0_COMMAND_CREATE_STREAM;
        /**
         * Transaction ID of the command.
         */
        transaction_id = 2;
        /**
         * If there exists any command info this is set, else this is set to null type.
         * @remark, never be NULL, an AMF0 null instance.
         */
        command_object = SrsAmf0Any::null();
    }
    

    接收 createStream 后对该消息的解码如下:

    SrsCreateStreamPacket::decode

    int SrsCreateStreamPacket::decode(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
    
        if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode createStream command_name failed. ret=%d", ret);
            return ret;
        }
        if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_CREATE_STREAM) {
            ret = ERROR_RTMP_AMF0_DECODE;
            srs_error("amf0 decode createStream command_name failed. "
                "command_name=%s, ret=%d", command_name.c_str(), ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("amf0 decode createStream transaction_id failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
            srs_error("amf0 decode createStream command_object failed. ret=%d", ret);
            return ret;
        }
        
        srs_info("amf0 decode createStream packet success");
        
        return ret;
    }
    

    5.2.2 createStream response

    createStream 的响应消息是通过 SrsCreateStreamResPacket 类构造的,该类的构造如下:

    SrsCreateStreamResPacket 构造函数

    SrsCreateStreamResPacket::SrsCreateStreamResPacket(double _transaction_id, 
            double _stream_id)
    {
        /* _result */
        command_name = RTMP_AMF0_COMMAND_RESULT;
        /**
         * ID of the command that response belongs to.
         */
        transaction_id = _transaction_id;
        /**
         * If there exists any command info this is set, else this is set to null type.
         * @remark, never be NULL, an AMF0 null instance.
         */
        command_object = SrsAmf0Any::null();
        /* The return value is either a stream ID or an error information object. */
        stream_id = _stream_id;
    }
    

    接着对该 createStream response 消息的负载数据进行编码(即打包)。

    SrsCreateStreamResPacket::encode_packet

    int SrsCreateStreamResPacket::encode_packet(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("encode command_name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_name success.");
        
        if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("encode transaction_id failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode transaction_id success.");
        
        if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
            srs_error("encode command_object failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_object success.");
        
        if ((ret = srs_amf0_write_number(stream, stream_id)) != ERROR_SUCCESS) {
            srs_error("encode stream_id failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode stream_id success.");
        
        
        srs_info("encode createStream response packet success.");
        
        return ret;
    }
    

    send: createStream response

    5.3 publish

    5.3.1 publish 接收

    publish 消息用 SrsPublishPacket 类代表。该类的构造如下:

    SrsPublishPacket 构造函数

    /**
     * FMLE/flash publish
     * Publish
     * The client sends the publish command to publish a named stream to the
     * server. Using this name, any client can play this stream and receive
     * the published audio, video, and data messages.
     */
    SrsPublishPacket::SrsPublishPacket()
    {
        /* Name of the command, set to "publish". */
        command_name = RTMP_AMF0_COMMAND_PUBLISH;
        /* Transaction ID set to 0. */
        transaction_id = 0;
        /**
         * Command information object does not exist. Set to null type.
         * @remark, never be NULL, an AMF0 null instance.
         */
        command_object = SrsAmf0Any::null();
        /**
         * Type of publishing. Set to "live", "record", or "append".
         *   record: The stream is published and the data is recorded to a new file. The file
         *           is stored on the server in a subdirectory within the directory that
         *           contains the server application. If the file already exists, it is
         *           overwritten.
         *   append: The stream is published and the data is appended to a file. If no file
         *           is found, it is created.
         *   live: Live data is published without recording it in a file.
         * @remark, SRS only support live.
         * @remark, optional, default to live.
         */
        type = "live";
    }
    

    recv: publish

    该 publish 消息的解析如下代码。

    SrsPublishPacket::decode

    int SrsPublishPacket::decode(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        if ((ret = srs_amf0_read_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode publish command_name failed. ret=%d", ret);
            return ret;
        }
        if (command_name.empty() || command_name != RTMP_AMF0_COMMAND_PUBLISH) {
            ret = ERROR_RTMP_AMF0_DECODE;
            srs_error("amf0 decode publish command_name failed. "
                "command_name=%s, ret=%d", command_name.c_str(), ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("amf0 decode publish transaction_id failed. ret=%d", ret);
            return ret;
        }
        
        if ((ret = srs_amf0_read_null(stream)) != ERROR_SUCCESS) {
            srs_error("amf0 decode publish command_object failed. ret=%d", ret);
            return ret;
        }
        
        /* 读取推流的流名称 */
        if ((ret = srs_amf0_read_string(stream, stream_name)) != ERROR_SUCCESS) {
            srs_error("amf0 decode publish stream_name failed. ret=%d", ret);
            return ret;
        }
        
        /* 读取推流的类型,SRS 仅支持 live */
        if (!stream->empty() && (ret = srs_amf0_read_string(stream, type)) != ERROR_SUCCESS) {
            srs_error("amf0 decode publish type failed. ret=%d", ret);
            return ret;
        }
        
        srs_info("amf0 decode publish packet success");
        
        return ret;
    }
    

    5.3.2 publish response onFCPublish(NetStream.Publish.Start)

    该 publish 的响应 onFCPublish 使用 SrsOnStatusCallPacket 构造,该类的构造函数如下。

    SrsOnStatusCallPacket 构造函数

    SrsOnStatusCallPacket::SrsOnStatusCallPacket()
    {
        /* Name of command. Set to "onStatus" */
        command_name = RTMP_AMF0_COMMAND_ON_STATUS;
        /* Transaction ID set to 0. */
        transaction_id = 0;
        /**
         * Command information does not exist. Set to null type.
         * @remark, never be NULL, an AMF0 null instance.
         */
        args = SrsAmf0Any::null();
        /**
         * Name-value pairs that describe the response from the server. 
         * 'code','level', 'description' are names of few among such information.
         * @remark, never be NULL, an AMF0 object instance.
         */
        data = SrsAmf0Any::object();
    }
    

    注:publish 的响应消息 onFCPublish 的消息名为 onFCPublish。该消息的抓包如下:

    send: onFCPublish

    该 onFCPublish 消息负载数据的编码如下。

    SrsOnStatusCallPacket::encode_packet

    int SrsOnStatusCallPacket::encode_packet(SrsStream* stream)
    {
        int ret = ERROR_SUCCESS;
        
        if ((ret = srs_amf0_write_string(stream, command_name)) != ERROR_SUCCESS) {
            srs_error("encode command_name failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode command_name success.");
        
        if ((ret = srs_amf0_write_number(stream, transaction_id)) != ERROR_SUCCESS) {
            srs_error("encode transaction_id failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode transaction_id success.");
        
        if ((ret = srs_amf0_write_null(stream)) != ERROR_SUCCESS) {
            srs_error("encode args failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode args success.");;
        
        if ((ret = data->write(stream)) != ERROR_SUCCESS) {
            srs_error("encode data failed. ret=%d", ret);
            return ret;
        }
        srs_verbose("encode data success.");
        
        srs_info("encode onStatus(Call) packet success.");
        
        return ret;
    }
    

    5.3.3 publish response onStatus(NetStream.Publish.Start)

    该响应消息同样使用 SrsOnStatusCallPacket 类构造,该消息的名称即为 onStatus。抓包如下图

    send: onStatus

    6. SrsRtmpConn::publishing

    当服务器成功响应 obs 发送的 publish 消息后,即进入 SrsRtmpConn::publishing 函数,开始处理 obs 推送的媒体数据。具体分析见 SRS之SrsRtmpConn::publishing详解.

  • 相关阅读:
    nginx配置404
    js修改浏览器url
    mysql DATE_ADD DATE_SUB
    centos6.5 ssh安全优化,修改默认端口名,禁止root远程登录
    关于mysql varchar 类型的最大长度限制
    IIS7多域名绑定同一物理目录,设置不同默认文档的解决方案
    获取某个数据所在数据列表中的行数 mysql
    安全模式不能删除使用SET SQL_SAFE_UPDATES = 0;
    Failed to run the WC DB work queue associated with 错误的解决
    mysql正则匹配解决查询一个字段是否在另一个字段中
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9081876.html
Copyright © 2011-2022 走看看