Pipeline是媒体处理的核心流程逻辑。
Pipeline里面定义了两个主要的概念:Service和Handler。
Service负责处理那些不仅要看当前数据包,还要分析之前的数据包的那些业务,比如丢包重传;Handler处理当前的数据包的情形,比如生成填充字节。
在Pipeline里面,Handler和Service是配合起来一起工作的,他们通过一套框架将之关联起来。
先看看使用上:
void MediaStream::initializePipeline() { handler_manager_ = std::make_shared<HandlerManager>(shared_from_this()); pipeline_->addService(shared_from_this()); pipeline_->addService(handler_manager_); pipeline_->addService(rtcp_processor_); pipeline_->addService(stats_); pipeline_->addService(quality_manager_); pipeline_->addService(packet_buffer_); pipeline_->addFront(std::make_shared<PacketReader>(this)); pipeline_->addFront(std::make_shared<RtcpProcessorHandler>()); pipeline_->addFront(std::make_shared<FecReceiverHandler>()); pipeline_->addFront(std::make_shared<LayerBitrateCalculationHandler>()); pipeline_->addFront(std::make_shared<QualityFilterHandler>()); pipeline_->addFront(std::make_shared<IncomingStatsHandler>()); pipeline_->addFront(std::make_shared<RtpTrackMuteHandler>()); pipeline_->addFront(std::make_shared<RtpSlideShowHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingGeneratorHandler>()); pipeline_->addFront(std::make_shared<PliPacerHandler>()); pipeline_->addFront(std::make_shared<BandwidthEstimationHandler>()); pipeline_->addFront(std::make_shared<RtpPaddingRemovalHandler>()); pipeline_->addFront(std::make_shared<RtcpFeedbackGenerationHandler>()); pipeline_->addFront(std::make_shared<RtpRetransmissionHandler>()); pipeline_->addFront(std::make_shared<SRPacketHandler>()); pipeline_->addFront(std::make_shared<SenderBandwidthEstimationHandler>()); pipeline_->addFront(std::make_shared<LayerDetectorHandler>()); pipeline_->addFront(std::make_shared<OutgoingStatsHandler>()); pipeline_->addFront(std::make_shared<PacketCodecParser>()); pipeline_->addFront(std::make_shared<PacketWriter>(this)); pipeline_->finalize(); pipeline_initialized_ = true; }
在初始化时,pipeline调用了addService和addFront接口,将Service和Handler添加到pipeline中去。在初始化里面,我们可以看到其支持了哪些处理。
在实际使用中,接收到的数据,调用pipeline的read接口,就完成了解析为裸数据的事儿;调用write接口,就完成了fec等处理数据的事儿。
pipeline的数据,read的源需要是srtp解密后的数据,处理后为rtp裸数据;write的源为rtp裸数据,处理后的数据经过srtp加密输出到网络。(网络使用的是DtlsTransport接口对接的)
这些功能先不去管它,这里先弄清楚他们的架构和工作方式。
阅读这块儿的代码真是不容易,使用了很多模板类,为了方便理解,菜鸟哥根据代码,把所有的模板类替换为了实际的基类,来进行理解。
先看看pipeline的Service部分的继承体系以及数据结构:
再结合PipelineBase的addService实现,看一下Service是干啥用的
template <class S> void PipelineBase::addService(std::shared_ptr<S> service) { typedef typename ServiceContextType<S>::type Context; service_ctxs_.push_back(std::make_shared<Context>(shared_from_this(), std::move(service))); }
template <class Service> struct ServiceContextType { typedef ServiceContextImpl<Service> type; };
addService其实就是传递一个Service的子类对象,这个子类对象是用来给Context的构造函数传递参数的;Context就是ServiceContextImpl,也就是说addService里面的参数,就是为了创建一个ServiceContextImpl对象,这个对象创建出来以后,被存储在pipelinebase的service_ctxs_成员中。在addService接口中,还将pipeline自身,作为参数,传递给了ServiceContextImpl。通过代码看看这些参数怎么用
explicit ServiceContextImpl( std::weak_ptr<PipelineBase> pipeline, std::weak_ptr<S> service) { this->impl_ = this; this->initialize(pipeline, std::move(service)); } void initialize( std::weak_ptr<PipelineBase> pipeline, std::weak_ptr<S> service) { pipeline_weak_ = pipeline; pipeline_raw_ = pipeline.lock().get(); service_ = std::move(service); }
std::weak_ptr<S> getService() {
return service_;
}
ServiceContextImpl在构造时存储了PipelineBase和Service,这样外面再使用时,可以通过getService来获取到Service的实例。
这个获取操作很重要,看一下pipeline的notifyUpdate方法,看实际的处理handler(RtcpProcessorHandler)
void RtcpProcessorHandler::notifyUpdate() { auto pipeline = getContext()->getPipelineShared(); if (pipeline && !stream_) { stream_ = pipeline->getService<MediaStream>().get(); processor_ = pipeline->getService<RtcpProcessor>(); stats_ = pipeline->getService<Stats>(); } }
这个地方好神奇,通过调用getService方法,模板传递不同的类型,则能够获取到不同的对象实例。看一下getService方法
template <class S> std::shared_ptr<S> PipelineBase::getService() { auto ctx = getServiceContext<S>(); return ctx ? ctx->getService().lock() : std::shared_ptr<S>(); } template <class S> typename ServiceContextType<S>::type* PipelineBase::getServiceContext() { for (auto pipeline_service_ctx : service_ctxs_) { auto ctx = dynamic_cast<typename ServiceContextType<S>::type*>(pipeline_service_ctx.get()); if (ctx) { return ctx; } } return nullptr; }
在getServiceContext方法里面,遍历了pipeline的service_ctxs_,并对每一个ctx进行dynamic_cast转换,能够成功,就返回,不能成功就继续。这个地方真是灵活使用,奇思妙想。
到这里,就形成了一个共享的方式,所有的handler,都可以获得到所有的service的子类实例,在实现过程中就极大的提升了灵活性,每个service独立做自己的事儿,并且由handler直接进行数据驱动,简直太符合这个媒体处理的需要了。
总结:Service的核心意义是共享,即每个handler都可以通过类型来获取到所有的Service子类实例,进行使用,而不必要为每个Handler定义不同的接口来传递Service对象。Service也为了多个Handler公用数据而提供服务。