zoukankan      html  css  js  c++  java
  • peer-connection

    webrtc点对点会话建立过程:https://blog.csdn.net/zhuiyuanqingya/article/details/84108763

    本地Candidate收集

    本地的IceCandidate收集过程起始于PeerConnection::SetLocalDescription,其中会启动收集

      // MaybeStartGathering needs to be called after posting
      // MSG_SET_SESSIONDESCRIPTION_SUCCESS, so that we don't signal any candidates
      // before signaling that SetLocalDescription completed.
      transport_controller_->MaybeStartGathering();
    

    MaybeStartGathering具体实现是:

    void JsepTransportController::MaybeStartGathering() {
      if (!network_thread_->IsCurrent()) {
        network_thread_->Invoke<void>(RTC_FROM_HERE,
                                      [&] { MaybeStartGathering(); });
        return;
      }
    
      for (auto& dtls : GetDtlsTransports()) {
        dtls->ice_transport()->MaybeStartGathering();
      }
    }
    

    在这里会在network_thread_启动收集函数,然后对每个DtlsTransports启动收集过程,转到P2PTransportChannel::MaybeStartGathering() :

        if (pooled_session) {
          AddAllocatorSession(std::move(pooled_session));
          PortAllocatorSession* raw_pooled_session =
              allocator_sessions_.back().get();
          // Process the pooled session's existing candidates/ports, if they exist.
          OnCandidatesReady(raw_pooled_session,
                            raw_pooled_session->ReadyCandidates());
          for (PortInterface* port : allocator_sessions_.back()->ReadyPorts()) {
            OnPortReady(raw_pooled_session, port);
          }
          if (allocator_sessions_.back()->CandidatesAllocationDone()) {
            OnCandidatesAllocationDone(raw_pooled_session);
          }
        } else {
          AddAllocatorSession(allocator_->CreateSession(
              transport_name(), component(), ice_parameters_.ufrag,
              ice_parameters_.pwd));
          allocator_sessions_.back()->StartGettingPorts();
        }
    

    然后启动port收集BasicPortAllocatorSession::StartGettingPorts()

    void BasicPortAllocatorSession::StartGettingPorts() {
      RTC_DCHECK_RUN_ON(network_thread_);
      state_ = SessionState::GATHERING;
      if (!socket_factory_) {
        owned_socket_factory_.reset(
            new rtc::BasicPacketSocketFactory(network_thread_));
        socket_factory_ = owned_socket_factory_.get();
      }
    
      network_thread_->Post(RTC_FROM_HERE, this, MSG_CONFIG_START);
    
      RTC_LOG(LS_INFO) << "Start getting ports with turn_port_prune_policy "
                       << turn_port_prune_policy_;
    }
    

    此时的post消息是放到消息队列中MessageQueue::Post,会在this的BasicPortAllocatorSession::OnMessage中监听消息并进行处理:MSG_CONFIG_START->MSG_CONFIG_READY->MSG_ALLOCATE->MSG_SEQUENCEOBJECTS_CREATED

    void BasicPortAllocatorSession::OnMessage(rtc::Message* message) {
      switch (message->message_id) {
        case MSG_CONFIG_START:
          GetPortConfigurations();
          break;
        case MSG_CONFIG_READY:
          OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
          break;
        case MSG_ALLOCATE:
          OnAllocate();
          break;
        case MSG_SEQUENCEOBJECTS_CREATED:
          OnAllocationSequenceObjectsCreated();
          break;
        case MSG_CONFIG_STOP:
          OnConfigStop();
          break;
        default:
          RTC_NOTREACHED();
      }
    }
    

    OnAllocate中启动端口分配:遍历所有网络设备(Network 对象),创建 AllocationSequence 对象,调用其 InitStart 函数,分配 port。在init中会创建udp_socket_(CreateUdpSocket),在启用boundle的时候检测StunPorts的时候会和普通UDP端口共用一个socket。

    void AllocationSequence::Start() {
      state_ = kRunning;
      session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
      // Take a snapshot of the best IP, so that when DisableEquivalentPhases is
      // called next time, we enable all phases if the best IP has since changed.
      previous_best_ip_ = network_->GetBestIP();
    }
    

    发送post消息后会进入AllocationSequence对象本身的OnMessage:

    void AllocationSequence::OnMessage(rtc::Message* msg) {
      RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
      RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);
    
      const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};
    
      // Perform all of the phases in the current step.
      RTC_LOG(LS_INFO) << network_->ToString()
                       << ": Allocation Phase=" << PHASE_NAMES[phase_];
    
      switch (phase_) {
        case PHASE_UDP:
          CreateUDPPorts();
          CreateStunPorts();
          break;
    
        case PHASE_RELAY:
          CreateRelayPorts();
          break;
    
        case PHASE_TCP:
          CreateTCPPorts();
          state_ = kCompleted;
          break;
    
        default:
          RTC_NOTREACHED();
      }
    
      if (state() == kRunning) {
        ++phase_;
        session_->network_thread()->PostDelayed(RTC_FROM_HERE,
                                                session_->allocator()->step_delay(),
                                                this, MSG_ALLOCATION_PHASE);
      } else {
        // If all phases in AllocationSequence are completed, no allocation
        // steps needed further. Canceling  pending signal.
        session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
        SignalPortAllocationComplete(this);
      }
    }
    

    AllocationSequence 的 phase_ 成员在对象创建时初始化为 0, 等于 PHASE_UDP ,所以首先会进入 PHASE_UDP 的处理过程,处理完成后会进入下一个处理session_->network_thread()->PostDelayed即PHASE_RELAY。

    UDP phase 会收集两种类型的 candidate:host 和 srflx。会先收集host,然后srflx,但是PORTALLOCATOR_ENABLE_SHARED_SOCKET设置为true时会共享一个socket。

    void AllocationSequence::CreateUDPPorts() {
      if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) {
        RTC_LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping.";
        return;
      }
    
      // TODO(mallinath) - Remove UDPPort creating socket after shared socket
      // is enabled completely.
      std::unique_ptr<UDPPort> port;
      bool emit_local_candidate_for_anyaddress =
          !IsFlagSet(PORTALLOCATOR_DISABLE_DEFAULT_LOCAL_CANDIDATE);
      if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) {
        port = UDPPort::Create(
            session_->network_thread(), session_->socket_factory(), network_,
            udp_socket_.get(), session_->username(), session_->password(),
            session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
            session_->allocator()->stun_candidate_keepalive_interval());
      } else {
        port = UDPPort::Create(
            session_->network_thread(), session_->socket_factory(), network_,
            session_->allocator()->min_port(), session_->allocator()->max_port(),
            session_->username(), session_->password(),
            session_->allocator()->origin(), emit_local_candidate_for_anyaddress,
            session_->allocator()->stun_candidate_keepalive_interval());
      }
    
      if (port) {
        // If shared socket is enabled, STUN candidate will be allocated by the
        // UDPPort.
        if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
          udp_port_ = port.get();
          port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed);
    
          // If STUN is not disabled, setting stun server address to port.
          if (!IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
            if (config_ && !config_->StunServers().empty()) {
              RTC_LOG(LS_INFO)
                  << "AllocationSequence: UDPPort will be handling the "
                     "STUN candidate generation.";
              port->set_server_addresses(config_->StunServers());
            }
          }
        }
    
        session_->AddAllocatedPort(port.release(), this, true);
      }
    }
    

    之后进入BasicPortAllocatorSession::AddAllocatedPort,这个时候会关联信号和槽并将此port存入队列中:

    void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
                                                     AllocationSequence* seq,
                                                     bool prepare_address) {
      RTC_DCHECK_RUN_ON(network_thread_);
      if (!port)
        return;
    
      RTC_LOG(LS_INFO) << "Adding allocated port for " << content_name();
      port->set_content_name(content_name());
      port->set_component(component());
      port->set_generation(generation());
      if (allocator_->proxy().type != rtc::PROXY_NONE)
        port->set_proxy(allocator_->user_agent(), allocator_->proxy());
      port->set_send_retransmit_count_attribute(
          (flags() & PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);
      //将port存入队列
      PortData data(port, seq);
      ports_.push_back(data);
      //关联信号和槽
      port->SignalCandidateReady.connect(
          this, &BasicPortAllocatorSession::OnCandidateReady);
      port->SignalCandidateError.connect(
          this, &BasicPortAllocatorSession::OnCandidateError);
      port->SignalPortComplete.connect(this,
                                       &BasicPortAllocatorSession::OnPortComplete);
      port->SignalDestroyed.connect(this,
                                    &BasicPortAllocatorSession::OnPortDestroyed);
      port->SignalPortError.connect(this, &BasicPortAllocatorSession::OnPortError);
      RTC_LOG(LS_INFO) << port->ToString() << ": Added port to allocator";
    
      if (prepare_address)
        port->PrepareAddress();
    }
    

    之后进入UDPPort::PrepareAddress,然后转到UDPPort::OnLocalAddressReady

    void UDPPort::OnLocalAddressReady(rtc::AsyncPacketSocket* socket,
                                      const rtc::SocketAddress& address) {
      // When adapter enumeration is disabled and binding to the any address, the
      // default local address will be issued as a candidate instead if
      // |emit_local_for_anyaddress| is true. This is to allow connectivity for
      // applications which absolutely requires a HOST candidate.
      rtc::SocketAddress addr = address;
    
      // If MaybeSetDefaultLocalAddress fails, we keep the "any" IP so that at
      // least the port is listening.
      MaybeSetDefaultLocalAddress(&addr);
    
      AddAddress(addr, addr, rtc::SocketAddress(), UDP_PROTOCOL_NAME, "", "",
                 LOCAL_PORT_TYPE, ICE_TYPE_PREFERENCE_HOST, 0, "", false);
      MaybePrepareStunCandidate();
    }
    

    会调用Port::AddAddress->Port::FinishAddingAddress在这里会触发SignalCandidateReady,转到回调函数

    void Port::FinishAddingAddress(const Candidate& c, bool is_final) {
      candidates_.push_back(c);
      SignalCandidateReady(this, c);
    
      PostAddAddress(is_final);
    }
    
    void BasicPortAllocatorSession::OnCandidateReady(Port* port,
                                                     const Candidate& c) {
        ......
    // If the current port is not pruned yet, SignalPortReady.
        if (!data->pruned()) {
          RTC_LOG(LS_INFO) << port->ToString() << ": Port ready.";
          SignalPortReady(this, port);
          port->KeepAliveUntilPruned();
        }
      }
    
      if (data->ready() && CheckCandidateFilter(c)) {
        std::vector<Candidate> candidates;
        candidates.push_back(allocator_->SanitizeCandidate(c));
        SignalCandidatesReady(this, candidates);
      } else {
        RTC_LOG(LS_INFO) << "Discarding candidate because it doesn't match filter.";
      }
    ........
    }
    

    信号和槽关联流转:

    BasicPortAllocatorSession::OnCandidateReady
                         ↓SignalCandidatesReady
    P2PTransportChannel::OnCandidatesReady
                         ↓SignalCandidateGathered
    JsepTransportController::OnTransportCandidateGathered_n
                         ↓SignalIceCandidatesGathered
    PeerConnection::OnTransportControllerCandidatesGathered
                         ↓
    PeerConnection::OnIceCandidate
    
    SignalPortReady
    P2PTransportChannel::OnPortReady
                         ↓
    P2PTransportChannel::CreateConnection
                         ↓
    P2PTransportChannel::SortConnectionsAndUpdateState
                         ↓
    
    void PeerConnection::OnIceCandidate(
        std::unique_ptr<IceCandidateInterface> candidate) {
      if (IsClosed()) {
        return;
      }
      ReportIceCandidateCollected(candidate->candidate());
      Observer()->OnIceCandidate(candidate.get());//然后客户端就会监听到ICECandidate收集到的消息
    }
    

    复用 socket 的情况下, AllocationSequence::CreateStunPorts函数会直接返回,因为早在 AllocationSequence::CreateUDPPorts函数的执行过程中,就已经执行了 STUN Binding request 的发送逻辑。

    发送 STUN Binding request:

    UDPPort::OnLocalAddressReady(CreateStunPorts)
                ↓
    UDPPort::MaybePrepareStunCandidate
                ↓
    UDPPort::SendStunBindingRequests
                ↓
    UDPPort::SendStunBindingRequest
                ↓
    StunRequestManager::Send
                ↓
    StunRequestManager::SendDelayed
                ↓MSG_STUN_SEND
    StunRequest::OnMessage
                ↓SignalSendPacket
    UDPPort::OnSendPacket
                ↓
    AsyncUDPSocket::SendTo
                ↓
    PhysicalSocket::SendTo
                ↓
    

    收到 STUN Binding response:

    PhysicalSocketServer::Wait
                ↓
    SocketDispatcher::OnEvent
                ↓
    AsyncUDPSocket::OnReadEvent
                ↓SignalReadPacket
    AllocationSequence::OnReadPacket
                ↓
    UDPPort::HandleIncomingPacket
                ↓
    UDPPort::OnReadPacket
                ↓
    StunRequestManager::CheckResponse
                ↓
    StunBindingRequest::OnResponse
                ↓
    UDPPort::OnStunBindingRequestSucceeded
                ↓
    Port::AddAddress
                ↓
    Port::FinishAddingAddress
                ↓
    BasicPortAllocatorSession::OnCandidateReady
                ↓SignalCandidatesReady
    

    StunRequest 类是对 STUN request 的定义和封装,基类里实现了 request 超时管理、重发的逻辑,各种特定类型的逻辑由子类实现,例如 StunBindingRequest 和 TurnAllocateRequest

    StunRequestManager 则实现了 response 和 request 匹配的逻辑:manager 按 transaction id => request 的 hash 保存了所有的 request,收到 response 后,根据 transaction id 即可找到对应的 request,进而可以执行 request 对象的回调。

    candidate 收集状态:

    • P2PTransportChannel 创建时, gathering_state_
      为 kIceGatheringNew;

    • P2PTransportChannel::MaybeStartGathering
      里开始收集 candidate 时,如果当前未处于 Gathering 状态,则切换到 kIceGatheringGathering 状态;

    • 在收到 OnCandidatesAllocationDone 回调时,切换到kIceGatheringComplete状态;

      • BasicPortAllocatorSession::CandidatesAllocationDone为 true 时,就会触发这个回调;
      • 这意味着创建了 AllocationSequence,且所有的 AllocationSequence 都不处于 kRunning 状态,且所有的 port 都不处于 STATE_INPROGRESS
        状态;
      • AllocationSequence 创建时处于 kInit 状态, AllocationSequence::Start
        里切换为 kRunning 状态,TCP phase 结束后切换为 kCompleted 状态,如果调用了 AllocationSequence::Stop,则会切换到 kStopped 状态;
      • port 创建时就处于 STATE_INPROGRESS状态,当被 prune、发生错误时,分别切换到 STATE_PRUNEDSTATE_ERROR状态,TurnPort 和 TcpPort 收集到 candidate 后调用 Port::AddAddress时,就会切换到 STATE_COMPLETE状态,RelayPort(GTURN)和 UdpPort 也会在收集到 candidate 后切换到 STATE_COMPLETE状态,StunPort 则会在收集完 candidate(即向所有 STUN server 完成了 binding request)之后切换到 STATE_COMPLETE状态.
    BasicPortAllocatorSession::
    OnAllocationSequenceObjectsCreated也会调用检测是否完成
    
    
    AllocationSequence::OnMessage                     Port::FinishAddingAddress
                ↓SignalPortAllocationComplete                   ↓
    OnPortAllocationComplete                      Port::PostAddAddress(bool is_final)
                ↓                                            ↓SignalPortComplete
                ↓                        BasicPortAllocatorSession::OnPortComplete
                ↓                            ↓
    BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone
                ↓
    BasicPortAllocatorSession::CandidatesAllocationDone
                ↓true
    SignalCandidatesAllocationDone
                ↓
    OnCandidatesAllocationDone
    
    

    远端Candidates设置

    PeerConnection::SetRemoteDescription    UpdateStats(kStatsOutputLevelStandard)
                ↓
    PeerConnection::UseCandidatesInSessionDescription
                ↓
    PeerConnection::UseCandidate
                ↓SetIceConnectionState(PeerConnectionInterface::kIceConnectionChecking)
    JsepTransportController::AddRemoteCandidates
                ↓
    JsepTransport::AddRemoteCandidates
                ↓
    P2PTransportChannel::AddRemoteCandidate
                ↓
    P2PTransportChannel::FinishAddingRemoteCandidate
                ↓
    P2PTransportChannel::CreateConnections
                ↓
    P2PTransportChannel::CreateConnection
                ↓
    UDPPort::CreateConnection
                ↓
    Port::AddOrReplaceConnection   SignalConnectionCreated(这个信号未关联,无用)
                ↓
    P2PTransportChannel::AddConnection
                ↓
    P2PTransportChannel::RememberRemoteCandidate (维护remote_candidates_列表)
    
    P2PTransportChannel::FinishAddingRemoteCandidate
                ↓
    P2PTransportChannel::SortConnectionsAndUpdateState
                ↓
    P2PTransportChannel::MaybeStartPinging
                ↓
    P2PTransportChannel::CheckAndPing    FindNextPingableConnection
                ↓
    P2PTransportChannel::PingConnection
                ↓
    Connection::Ping
                ↓
    StunRequestManager::Send  IceCandidatePairState::IN_PROGRESS
                ↓
    StunRequestManager::SendDelayed
                ↓
               ...这个和STUN Binding response类似 
                ↓
    StunRequestManager::CheckResponse
                ↓
    ConnectionRequest::OnResponse
                ↓
    Connection::OnConnectionRequestResponse
                ↓
    Connection::ReceivedPingResponse
                ↓
    Connection::set_write_state STATE_WRITABLE
                ↓SignalStateChange
    P2PTransportChannel::OnConnectionStateChange
                ↓
    RequestSortAndStateUpdate
                ↓
    P2PTransportChannel::SortConnectionsAndUpdateState
    
    

    P2PTransportChannel::SwitchSelectedConnection
                        ↓ sig slot (SignalReadyToSend)
    DtlsTransport::OnReadyToSend
                        ↓ sig slot (SignalReadyToSend)
    RtpTransport::OnReadyToSend
    

    AddConnection会关联端口的发送和接收

    void P2PTransportChannel::AddConnection(Connection* connection) {
      RTC_DCHECK_RUN_ON(network_thread_);
      connections_.push_back(connection);
      unpinged_connections_.insert(connection);
      connection->set_remote_ice_mode(remote_ice_mode_);
      connection->set_receiving_timeout(config_.receiving_timeout);
      connection->set_unwritable_timeout(config_.ice_unwritable_timeout);
      connection->set_unwritable_min_checks(config_.ice_unwritable_min_checks);
      connection->set_inactive_timeout(config_.ice_inactive_timeout);
      connection->SignalReadPacket.connect(this,
                                           &P2PTransportChannel::OnReadPacket);
      connection->SignalReadyToSend.connect(this,
                                            &P2PTransportChannel::OnReadyToSend);
      connection->SignalStateChange.connect(
          this, &P2PTransportChannel::OnConnectionStateChange);
      connection->SignalDestroyed.connect(
          this, &P2PTransportChannel::OnConnectionDestroyed);
      connection->SignalNominated.connect(this, &P2PTransportChannel::OnNominated);
    
      had_connection_ = true;
    
      connection->set_ice_event_log(&ice_event_log_);
      LogCandidatePairConfig(connection,
                             webrtc::IceCandidatePairConfigType::kAdded);
    }
    
    
  • 相关阅读:
    接口测试工具-Jmeter使用笔记(五:正则表达式提取器)
    接口测试工具-Jmeter使用笔记(四:响应断言)
    接口测试工具-Jmeter使用笔记(三:管理请求服务器信息和Headers参数)
    接口测试工具-Jmeter使用笔记(二:GET/POST请求参数填写)
    接口测试工具-Jmeter使用笔记(一:运行一个HTTP请求)
    Centos6.5+Redmine
    【linux】扒站命令之利用wget快速扒站利用wget快速扒站
    【javascript】javascript设计模式mixin模式
    【vue】混合模式
    【小程序爬坑之路】获取地理位置信息
  • 原文地址:https://www.cnblogs.com/bloglearning/p/12128510.html
Copyright © 2011-2022 走看看