zoukankan      html  css  js  c++  java
  • licode学习之erizo篇--Pipeline_handle

    erizo的pipeline的handle,是媒体数据处理的基本操作,handle分为3类:IN,OUT,BOTH

    IN:数据进入handle,handle需要read数据并传递给下一级

    OUT:数据进入handle,handle需要write数据并传递给下一级

    BOTH:可以同时进行read和write

    在宏观语义上,IN的目标是输出rtp裸数据;OUT的目标是封装rtp裸数据

    pipeline的handle的继承体系如下:

    handle有三种:InboundHandler、Handler、OutboundHandler;他们分别对应一个Context

    InboundHandler==>InboundContextImpl

    Handler==>ContextImpl

    OutboundHandler==>OutboundContextImpl

    他们的工作方式差不多,就挑一个InboundHandler来进行说明。pipeline有addFront方法,该方法是注册Handler的,先看看这个方法的相关实现:

    template <class H>
    PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
      typedef typename ContextType<H>::type Context;
      return addHelper(
          std::make_shared<Context>(shared_from_this(), std::move(handler)),
          true);
    }
    template <class Context>
    PipelineBase& PipelineBase::addHelper(
        std::shared_ptr<Context>&& ctx,  // NOLINT
        bool front) {
      ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
      if (Context::dir == HandlerDir_BOTH || Context::dir == HandlerDir_IN) {
        inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
      }
      if (Context::dir == HandlerDir_BOTH || Context::dir == HandlerDir_OUT) {
        outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
      }
      return *this;
    }

    addFront的功能,就是创建一个Context的对象,并将之传递给addHelper为参数。Context的构造还被传递了Handler自己的指针,让Context能够知道handler

    addHelper将Context存储起来,并且判断类型,分别放到inCtxs_和outCtxs_里面,待用

    Context的实际类型是啥呢?

    template <class Handler>
    struct ContextType {
      typedef typename std::conditional<
        Handler::dir == HandlerDir_BOTH,
        ContextImpl<Handler>,
        typename std::conditional<
          Handler::dir == HandlerDir_IN,
          InboundContextImpl<Handler>,
          OutboundContextImpl<Handler>
        >::type>::type
      type;
    };

    Context的实际类型,要依托于Handler::dir成员的值,这个值是一个常量,是每个Handler都有的。

    HandlerDir_IN:Context为InboundContextImpl

    HandlerDir_BOTH:Context为ContextImpl

    HandlerDir_其他:Context为OutboundContextImpl

    所以,pipeline的addFront方法,实际上是创建HandlerContext的实例,并且将之存储的过程。

    pipeline的数据链路建立,是怎么样的呢

    void Pipeline::finalize() {
      front_ = nullptr;
      if (!inCtxs_.empty()) {
        front_ = dynamic_cast<InboundLink*>(inCtxs_.front());
        for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
          inCtxs_[i]->setNextIn(inCtxs_[i+1]);
        }
        inCtxs_.back()->setNextIn(nullptr);
      }
    
      back_ = nullptr;
      if (!outCtxs_.empty()) {
        back_ = dynamic_cast<OutboundLink*>(outCtxs_.back());
        for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
          outCtxs_[i]->setNextOut(outCtxs_[i-1]);
        }
        outCtxs_.front()->setNextOut(nullptr);
      }
    
      if (!front_) {
        // detail::logWarningIfNotUnit<R>(
        //     "No inbound handler in Pipeline, inbound operations will throw "
        //     "std::invalid_argument");
      }
      if (!back_) {
        // detail::logWarningIfNotUnit<W>(
        //     "No outbound handler in Pipeline, outbound operations will throw "
        //     "std::invalid_argument");
      }
    
      for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
        (*it)->attachPipeline();
      }
    
      for (auto it = service_ctxs_.rbegin(); it != service_ctxs_.rend(); it++) {
        (*it)->attachPipeline();
      }
    
      notifyUpdate();
    }

    pipeline里面的finalize方法为In和Out的Context设置任务链,并且设置头结点(front_,back_)之后,每个HandlerContext就知道自己的下一级任务是什么了

    还需要了解一下触发方式:

    MediaStream的OnTransportData获得packet数据

    void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport) {
      if ((audio_sink_ == nullptr && video_sink_ == nullptr && fb_sink_ == nullptr)) {
        return;
      }
    
      std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet);
    
      if (transport->mediaType == AUDIO_TYPE) {
        packet->type = AUDIO_PACKET;
      } else if (transport->mediaType == VIDEO_TYPE) {
        packet->type = VIDEO_PACKET;
      }
      auto stream_ptr = shared_from_this();
    
      worker_->task([stream_ptr, packet]{
        if (!stream_ptr->pipeline_initialized_) {
          ELOG_DEBUG("%s message: Pipeline not initialized yet.", stream_ptr->toLog());
          return;
        }
    
        char* buf = packet->data;
        RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
        RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
        if (!chead->isRtcp()) {
          uint32_t recvSSRC = head->getSSRC();
          if (stream_ptr->isVideoSourceSSRC(recvSSRC)) {
            packet->type = VIDEO_PACKET;
          } else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) {
            packet->type = AUDIO_PACKET;
          }
        }
    
        if (stream_ptr->pipeline_) {
          stream_ptr->pipeline_->read(std::move(packet));
        }
      });
    }

    在里面调用了pipeline的read方法,还需要知道read里面做什么了

    void Pipeline::read(std::shared_ptr<DataPacket> packet) {
      if (!front_) {
        return;
      }
      front_->read(std::move(packet));
    }

    调用了front的read,front是一个HandlerContext对象,并且是处理链的头结点

      void read(std::shared_ptr<DataPacket> packet) override {
        auto guard = this->pipelineWeak_.lock();
        this->handler_->read(this, std::move(packet));
      }

    在HandlerContext里面调用了handler_的read方法,并且把自己作为参数也同时传递给了handler。

    之后找一个真正的handler对象,看一下它的read实现:

    void LayerDetectorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {
      RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);
      if (!chead->isRtcp() && enabled_ && packet->type == VIDEO_PACKET) {
        if (packet->codec == "VP8") {
          parseLayerInfoFromVP8(packet);
        } else if (packet->codec == "VP9") {
          parseLayerInfoFromVP9(packet);
        } else if (packet->codec == "H264") {
          parseLayerInfoFromH264(packet);
        }
      }
      ctx->fireRead(std::move(packet));
    }

    在handler的read里面,再次调用了ctx的fireRead方法,并且把packet进行传递

      void fireRead(std::shared_ptr<DataPacket> packet) override {
        auto guard = this->pipelineWeak_.lock();
        if (this->nextIn_) {
          this->nextIn_->read(std::move(packet));
        }
      }

    在fireRead中,调用了nextIn_的read方法,nextIn_是下一个HandlerContext。

    这样就形成了一个任务链。

    ContextRead-->HanderRead-->Context fireRead-->Context next read-->ContextRead.........

    形成一个read的任务链。

    write的基本原理也是相似的。

    总结:erizo的pipeline的handler是负责实际数据处理的,通过处理链路,将之串联起来

  • 相关阅读:
    poj1179 Polygon
    poj2677 Tour
    MariaDB10多实例--mysqld_multi
    MariaDB10源码安装
    linux下php+freetds连接SQL server2012
    MariaDB yum安装
    mongoDB yum安装
    pxe 引导clonezilla live万能备份与还原
    (转) pppd 中文man页面
    Unix-like DisplayManager/LoginManager/WindowManager
  • 原文地址:https://www.cnblogs.com/limedia/p/licode_erizo_pipeline_handler.html
Copyright © 2011-2022 走看看