简介
SrsLiveSource:代表RTMP源
SrsRtcSource:代表RTC源
两个协议的转换,就是怎么从SrsLiveSource转到SrsRtcSource,这里需要一个桥梁SrsRtcFromRtmpBridger

创建Source
rtmp推流的时候就会创建SrsLiveSource和SrsRtcSource;
SrsRtmpConn::stream_service_cycle()

srs_error_t SrsRtmpConn::stream_service_cycle() { srs_error_t err = srs_success; SrsRequest* req = info->req; if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) { return srs_error_wrap(err, "rtmp: identify client"); } srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param); req->strip(); srs_trace("client identified, type=%s, vhost=%s, app=%s, stream=%s, param=%s, duration=%dms", srs_client_type_string(info->type).c_str(), req->vhost.c_str(), req->app.c_str(), req->stream.c_str(), req->param.c_str(), srsu2msi(req->duration)); // discovery vhost, resolve the vhost from config SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost); if (parsed_vhost) { req->vhost = parsed_vhost->arg0(); } if (req->schema.empty() || req->vhost.empty() || req->port == 0 || req->app.empty()) { return srs_error_new(ERROR_RTMP_REQ_TCURL, "discovery tcUrl failed, tcUrl=%s, schema=%s, vhost=%s, port=%d, app=%s", req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str()); } // check vhost, allow default vhost. if ((err = check_vhost(true)) != srs_success) { return srs_error_wrap(err, "check vhost"); } srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, stream=%s, param=%s, args=%s", req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port, req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null")); // do token traverse before serve it. // @see https://github.com/ossrs/srs/pull/239 if (true) { info->edge = _srs_config->get_vhost_is_edge(req->vhost); bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost); if (info->edge && edge_traverse) { if ((err = check_edge_token_traverse_auth()) != srs_success) { return srs_error_wrap(err, "rtmp: check token traverse"); } } } // security check if ((err = security->check(info->type, ip, req)) != srs_success) { return srs_error_wrap(err, "rtmp: security check"); } // Never allow the empty stream name, for HLS may write to a file with empty name. // @see https://github.com/ossrs/srs/issues/834 if (req->stream.empty()) { return srs_error_new(ERROR_RTMP_STREAM_NAME_EMPTY, "rtmp: empty stream"); } // client is identified, set the timeout to service timeout. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); // find a source to serve. SrsLiveSource* source = NULL; if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) { return srs_error_wrap(err, "rtmp: fetch source"); } srs_assert(source != NULL); // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) { return srs_error_wrap(err, "rtmp: stat client"); } bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str()); source->set_cache(enabled_cache); switch (info->type) { case SrsRtmpConnPlay: { // response connection start play if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start play"); } if ((err = http_hooks_on_play()) != srs_success) { return srs_error_wrap(err, "rtmp: callback on play"); } err = playing(source); http_hooks_on_stop(); return err; } case SrsRtmpConnFMLEPublish: { if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start FMLE publish"); } return publishing(source); } case SrsRtmpConnHaivisionPublish: { if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start HAIVISION publish"); } return publishing(source); } case SrsRtmpConnFlashPublish: { if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) { return srs_error_wrap(err, "rtmp: start FLASH publish"); } return publishing(source); } default: { return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type); } } return err; }
这里推流的业务处理,是会创建SrsLiveSource,然后我们看看publishing
SrsRtmpConn::publishing(SrsLiveSource* source)

srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source) { srs_error_t err = srs_success; SrsRequest* req = info->req; if (_srs_config->get_refer_enabled(req->vhost)) { if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) { return srs_error_wrap(err, "rtmp: referer check"); } } if ((err = http_hooks_on_publish()) != srs_success) { return srs_error_wrap(err, "rtmp: callback on publish"); } // TODO: FIXME: Should refine the state of publishing. if ((err = acquire_publish(source)) == srs_success) { // use isolate thread to recv, // @see: https://github.com/ossrs/srs/issues/237 SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id()); err = do_publishing(source, &rtrd); rtrd.stop(); } // whatever the acquire publish, always release publish. // when the acquire error in the midlle-way, the publish state changed, // but failed, so we must cleanup it. // @see https://github.com/ossrs/srs/issues/474 // @remark when stream is busy, should never release it. if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) { release_publish(source); } http_hooks_on_unpublish(); return err; }
这里先看看acquire_publish处理

srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) { srs_error_t err = srs_success; SrsRequest* req = info->req; // Check whether RTMP stream is busy. if (!source->can_publish(info->edge)) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtmp: stream %s is busy", req->get_stream_url().c_str()); } // Check whether RTC stream is busy. #ifdef SRS_RTC SrsRtcSource *rtc = NULL; bool rtc_server_enabled = _srs_config->get_rtc_server_enabled(); bool rtc_enabled = _srs_config->get_rtc_enabled(req->vhost); if (rtc_server_enabled && rtc_enabled && !info->edge) { if ((err = _srs_rtc_sources->fetch_or_create(req, &rtc)) != srs_success) { return srs_error_wrap(err, "create source"); } if (!rtc->can_publish()) { return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "rtc stream %s busy", req->get_stream_url().c_str()); } } #endif // Bridge to RTC streaming. #if defined(SRS_RTC) && defined(SRS_FFMPEG_FIT) if (rtc) { SrsRtcFromRtmpBridger *bridger = new SrsRtcFromRtmpBridger(rtc); if ((err = bridger->initialize(req)) != srs_success) { srs_freep(bridger); return srs_error_wrap(err, "bridger init"); } source->set_bridger(bridger); } #endif // Start publisher now. if (info->edge) { return source->on_edge_start_publish(); } else { return source->on_publish(); } }
可以看出,如果配置了RTC,就会创建SrsRtcSource和SrsRtcFromRtmpBridger。
获取RTMP数据
SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)

srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) { srs_error_t err = srs_success; // for edge, directly proxy message to origin. if (info->edge) { if ((err = source->on_edge_proxy_publish(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: proxy publish"); } return err; } // process audio packet if (msg->header.is_audio()) { if ((err = source->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume audio"); } return err; } // process video packet if (msg->header.is_video()) { if ((err = source->on_video(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume video"); } return err; } // process aggregate packet if (msg->header.is_aggregate()) { if ((err = source->on_aggregate(msg)) != srs_success) { return srs_error_wrap(err, "rtmp: consume aggregate"); } return err; } // process onMetaData if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) { SrsPacket* pkt = NULL; if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) { return srs_error_wrap(err, "rtmp: decode message"); } SrsAutoFree(SrsPacket, pkt); if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) { SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt); if ((err = source->on_meta_data(msg, metadata)) != srs_success) { return srs_error_wrap(err, "rtmp: consume metadata"); } return err; } return err; } return err; }
这里处理收到的源数据,下面分析一下音频的处理

srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio) { srs_error_t err = srs_success; // monotically increase detect. if (!mix_correct && is_monotonically_increase) { if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) { is_monotonically_increase = false; srs_warn("AUDIO: stream not monotonically increase, please open mix_correct."); } } last_packet_time = shared_audio->header.timestamp; // convert shared_audio to msg, user should not use shared_audio again. // the payload is transfer to msg, and set to NULL in shared_audio. SrsSharedPtrMessage msg; if ((err = msg.create(shared_audio)) != srs_success) { return srs_error_wrap(err, "create message"); } // directly process the audio message. if (!mix_correct) { return on_audio_imp(&msg); } // insert msg to the queue. mix_queue->push(msg.copy()); // fetch someone from mix queue. SrsSharedPtrMessage* m = mix_queue->pop(); if (!m) { return err; } // consume the monotonically increase message. if (m->is_audio()) { err = on_audio_imp(m); } else { err = on_video_imp(m); } srs_freep(m); return err; }
SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg)

srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) { srs_error_t err = srs_success; bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size); bool is_sequence_header = is_aac_sequence_header; // whether consumer should drop for the duplicated sequence header. bool drop_for_reduce = false; if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) { if (meta->previous_ash()->size == msg->size) { drop_for_reduce = srs_bytes_equals(meta->previous_ash()->payload, msg->payload, msg->size); srs_warn("drop for reduce sh audio, size=%d", msg->size); } } // Copy to hub to all utilities. if ((err = hub->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "consume audio"); } // For bridger to consume the message. if (bridger_ && (err = bridger_->on_audio(msg)) != srs_success) { return srs_error_wrap(err, "bridger consume audio"); } // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { SrsLiveConsumer* consumer = consumers.at(i); if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) { return srs_error_wrap(err, "consume message"); } } } // cache the sequence header of aac, or first packet of mp3. // for example, the mp3 is used for hls to write the "right" audio codec. // TODO: FIXME: to refine the stream info system. if (is_aac_sequence_header || !meta->ash()) { if ((err = meta->update_ash(msg)) != srs_success) { return srs_error_wrap(err, "meta consume audio"); } } // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { return err; } // cache the last gop packets if ((err = gop_cache->cache(msg)) != srs_success) { return srs_error_wrap(err, "gop cache consume audio"); } // if atc, update the sequence header to abs time. if (atc) { if (meta->ash()) { meta->ash()->timestamp = msg->timestamp; } if (meta->data()) { meta->data()->timestamp = msg->timestamp; } } return err; }
这里把数据转发给SrsRtcFromRtmpBridger,上面都比较简单,下面着重分析一下SrsRtcFromRtmpBridger收到数据以后的语音转码