1. 前言
本文是webrtc中拥塞控制的上文,主要是分析webrtc中的拥塞控制的码率探测,预估和调整的部分,介绍了整体框架和原理以及相关的类;
webrtc版本:M91
2.正文
2.1 整体框架
webrtc中的部分码控结构如下图所示,从socket层接收到数据后,到transport解析rtcp包处理得到feedback,通过call将feedback转发到对应sendstream上的rtcp处理模块,最终通过RtpTransportControllerSend将feedback转发到GoogCcNetworkController进行码率预估后,把预估的码率(target bitrate), 探测策略(probe config), congestion windows给pacer,pacer转发给pacingContrller去使用进行发送码率控制
其中以GoogCcNetworkController作为整个码率预估及调整的核心 ,涉及的类和流程如下图所示,红框中的类在GoogCcNetworkController下
ProbeBitrateEstimator : 根据feedback计算探测码率,PacingController中会将包按照cluster进行划分,transport-CC报文能得到包所属的cluster以及发送和接收信息,通过发送和接收的数据大小比判断是否到达链路上限从而进行带宽探测
AcknowledgedBitrateEstimator : 估算当前的吞吐量
BitrateEstimator :使用滑动窗口 + 卡尔曼滤波计算当前发送吞吐量
DelayBasedBwe : 基于延迟预估码率
TrendlineEstimator : 使用线性回归计算当前网络拥堵情况
AimdRateControl : 通过TrendLine预测出来的网络状态对码率进行aimd方式调整
SendSideBandwidthEstimation : 基于丢包计算预估码率,结合延迟预估码率,得到最终的目标码率
ProbeController : 探测控制器,通过目标码率判断下次是否探测,探测码率大小
CongestionWindowPushbackController : 基于当前的rtt设置一个时间窗口,同时基于当前的码率设置当前时间窗口下的数据量,通过判断当前窗口的使用量,如果使用量过大的时候,降低编码时使用的目标码率,加速窗口消退,减少延迟
AlrDetector : 应用(码率)受限检测,检测当前的发送码率是否和目标码率由于编码器等原因相差过大受限了,受限情况下会触发带宽预测过程的特殊处理
NetworkStateEstimator 、 NetworkStatePredictor : 此两者属于待开发类,只是在代码中有,但是还没开发完,没用上.
接下来会以GoogCcNetworkController的码率预估过程为例, 详细介绍webrtc中带宽控制的架构和过程。
2.2 GoogCcNetworkController
GoogCcNetworkController是码率预估的核心类, 如2.1中所示的webrtc中的部分码控结构上
,可以看到其所属于**class RtpTransportControllerSend **
2.2.1 GoogCcNetworkController创建时刻
在底层网络可用的时候,会触发RtpTransportControllerSend::OnNetworkAvailability()
回调
void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
RTC_LOG(LS_VERBOSE) << "SignalNetworkState "
<< (network_available ? "Up" : "Down");
NetworkAvailability msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
msg.network_available = network_available;
task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (network_available_ == msg.network_available)
return;
network_available_ = msg.network_available;
if (network_available_) {
pacer()->Resume();
} else {
pacer()->Pause();
}
pacer()->UpdateOutstandingData(DataSize::Zero());
if (controller_) {
control_handler_->SetNetworkAvailability(network_available_);
PostUpdates(controller_->OnNetworkAvailability(msg));
UpdateControlState();
} else {
// 未创建controller,创建
MaybeCreateControllers();
}
});
for (auto& rtp_sender : video_rtp_senders_) {
rtp_sender->OnNetworkAvailability(network_available);
}
}
其检测到未创建controller_时,会调用 RtpTransportControllerSend::MaybeCreateControllers()
创建
void RtpTransportControllerSend::MaybeCreateControllers() {
RTC_DCHECK(!controller_);
RTC_DCHECK(!control_handler_);
if (!network_available_ || !observer_)
return;
control_handler_ = std::make_unique<congestioncontrolhandler>();
initial_config_.constraints.at_time =
Timestamp::Millis(clock_->TimeInMilliseconds());
initial_config_.stream_based_config = streams_config_;
// TODO(srte): Use fallback controller if no feedback is available.
// 创建GoogCcNetworkController
if (controller_factory_override_) {
RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
controller_ = controller_factory_override_->Create(initial_config_);
process_interval_ = controller_factory_override_->GetProcessInterval();
} else {
RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
controller_ = controller_factory_fallback_->Create(initial_config_);
process_interval_ = controller_factory_fallback_->GetProcessInterval();
}
// 间隔更新GoogCcNetworkController
UpdateControllerWithTimeInterval();
StartProcessPeriodicTasks();
}
创建后即刻就调用 UpdateControllerWithTimeInterval()
和 StartProcessPeriodicTasks()
:
void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
RTC_DCHECK(controller_);
ProcessInterval msg;
msg.at_time = Timestamp::Millis(clock_->TimeInMilliseconds());
if (add_pacing_to_cwin_)
msg.pacer_queue = pacer()->QueueSizeData();
// 对码率进行检测和更新,将结果转发给pacer
PostUpdates(controller_->OnProcessInterval(msg));
}
UpdateControllerWithTimeInterval()
中:
-
调用
GoogCcNetworkController::OnProcessInterval()
做间隔的码率检测和更新 -
调用
PostUpdates()
将最新的码率给转发到pacer
void RtpTransportControllerSend::StartProcessPeriodicTasks() {
if (!pacer_queue_update_task_.Running()) {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time = pacer()->ExpectedQueueTime();
control_handler_->SetPacerQueue(expected_queue_time);
UpdateControlState();
return kPacerQueueUpdateInterval;
});
}
controller_task_.Stop();
if (process_interval_.IsFinite()) {
// 定时检测更新码率
controller_task_ = RepeatingTaskHandle::DelayedStart(
task_queue_.Get(), process_interval_, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdateControllerWithTimeInterval();
return process_interval_;
});
}
}
StartProcessPeriodicTasks()
中:
-
对control_handler_进行了更新,control_handler 是一个将controller计算相关码率信息路由回调给其它模块的一个类(后续在仔细分析),调用UpdateControlState()更新,将信息回调给其它
-
创建了一个controller_task_去定时的做
UpdateControllerWithTimeInterval()
接下来会通过介绍cc-controller下最重要的几个函数来介绍码率控制的核心过程,其分别是OnProcessInterval()
和OnTransportPacketsFeedback()
,前者根据时间流逝定时更新码率, 后者需要借助于cc-feedback的到来才能更新码率, 这两个函数涉及到的类都很广,如果把里面的类一次性介绍到底的话,文章的逻辑结构性会很差,所以把其中涉及到的类都提出来点到为止,详细的会放在后面去独立介绍,可自行查阅。
2.2.2 定时检测-OnProcessInterval()
GoogCcNetworkController::OnProcessInterval()
是cc-controller的核心函数之一,会定时的触发,用来做带宽检测和更新:
NetworkControlUpdate GoogCcNetworkController::OnProcessInterval(
ProcessInterval msg) {
NetworkControlUpdate update;
if (initial_config_) {
// 重设loss_based和delay_based码率探测器和probe的初始码率
// 获得码率探测簇配置(probe_cluster_config)
update.probe_cluster_configs =
ResetConstraints(initial_config_->constraints);
// 获取当前pacing 的发送码率, padding, time_windows等
update.pacer_config = GetPacingRates(msg.at_time);
// probe探测完成后,允许其因为alr需要快速恢复码率而继续做probe
if (initial_config_->stream_based_config.requests_alr_probing) {
probe_controller_->EnablePeriodicAlrProbing(
*initial_config_->stream_based_config.requests_alr_probing);
}
absl::optional<datarate> total_bitrate =
initial_config_->stream_based_config.max_total_allocated_bitrate;
if (total_bitrate) {
// 为probe设置最大的分配码率(MaxTotalAllocatedBitrate)作为探测的上边界
// 并生成响应的probe_cluster_config去进行探测
auto probes = probe_controller_->OnMaxTotalAllocatedBitrate(
total_bitrate->bps(), msg.at_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
max_total_allocated_bitrate_ = *total_bitrate;
}
// 释放initial_config_,下次进来就不通过init_config做初始化了
initial_config_.reset();
}
// 更新拥塞窗口中的pacing数据长度
if (congestion_window_pushback_controller_ && msg.pacer_queue) {
congestion_window_pushback_controller_->UpdatePacingQueue(
msg.pacer_queue->bytes());
}
// 更新码率
bandwidth_estimation_->UpdateEstimate(msg.at_time);
// 检测当前是否处于alr
absl::optional<int64_t> start_time_ms =
alr_detector_->GetApplicationLimitedRegionStartTime();
// 如果处于alr,告诉probe_controller处于alr,可以进行探测,进行快恢复
probe_controller_->SetAlrStartTimeMs(start_time_ms);
// 检测当前是否因alr状态而需要做probe了,获取probe_cluster_config
auto probes = probe_controller_->Process(msg.at_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
if (rate_control_settings_.UseCongestionWindow() &&
last_packet_received_time_.IsFinite() && !feedback_max_rtts_.empty()) {
// 根据rtt和target_rate 更新当前拥塞控制窗口大小
UpdateCongestionWindowSize();
}
if (congestion_window_pushback_controller_ && current_data_window_) {
// 重新设置拥塞控制窗口大小
congestion_window_pushback_controller_->SetDataWindow(
*current_data_window_);
} else {
update.congestion_window = current_data_window_;
}
// 获取更新后的码率,probe等,同时对alr, probe_controller中的码率进行更新
MaybeTriggerOnNetworkChanged(&update, msg.at_time);
return update;
}
GoogCcNetworkController::OnProcessInterval()
中:
-
在第一次调用该函数时,使用initial_config_设置DelayBasedBwe, SendSideBandwidthEstimation, ProbeController中的初始码率,ProbeController设置完码率之后会返回一个probe_cluster_config(探测簇), probe_cluster_config会返回给pacing_controller,pacing_controller在发包的时候使用其中的码率去发包以配合码率探测。
-
为ProbeController设置最大分配码率(MaxTotalAllocatedBitrate),这个值在ProbeController中会被用来做探测的上边界,一旦探测的码率到达这个值,就停止普通探测。
-
过了初始化后,SendSideBandwidthEstimation(也就是bandwidth_estimation_)会基于时间更新码率,其内部虽然是依靠cc-feedback提供丢包率来预估码率,当没有feedback也会基于时间预估当前的rtt去更新码率。
-
从AlrDetector获取当前是否处于alr状态,AlrDetector在每次发送数据时(OnSentPacket)都会检测实际发送码率是否与目标码率相差太多悬殊,从而判断是否(受限于编码器等原因而导致)无法达到目标码率,从而设定处于alr状态,alr状态非常有用,带宽预测的核心是需要向链路中发送足够的包去观察链路情况,如果探测到处于alr状态无法达到这个要求,就需要一些额外手段去处理。
-
设置ProbeController处于alr状态。ProbeController内完整了初始的在正常探测后就不再探测了,但如果处于alr状态或者网络变化的状态,是需要对网络进行探测以便于网络的快恢复;
-
从ProbeController获取probe_cluster_config,以进行需要可能的探测
-
根据rtt和congestion重新计算拥塞窗口控制器中的的数据大小(CongestionWindowPushbackController)
-
bandwidth_estimation_可能对码率进行了更新,调用
MaybeTriggerOnNetworkChanged()
将更新的码率同步到alr,probe_controller中,同时将码率,probe_config等放到update中返回
2.2.3 cc-feedback
2.2.3.1 cc-feedback报文
在介绍cc-controler中另一个重要的函数OnTransportPacketsFeedback()
前,因其在收到cc-feedback时触发。所以先介绍cc-feedback,cc-feedback协议的设计和详情可见R2. transport-cc-feedback草案或R5. WebRTC研究:Transport-cc之RTP及RTCP, 都介绍的非常详细易懂。
简单从报文介绍一下我们能从cc-feedback拿到什么:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|V=2|P| FMT=15 | PT=205 | length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
0 | SSRC of packet sender |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
4 | SSRC of media source |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
8 | base sequence number | packet status count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
12 | reference time | fb pkt. count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
16 | packet chunk | packet chunk |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
. .
. .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| packet chunk | recv delta | recv delta |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
. .
. .
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| recv delta | recv delta | zero padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
cc-feedback的PT=205, FMT=15, 从base sequence number开始就是cc-feedback的报文主体:
base sequence number:TransportFeedback包中记录的第一个RTP包的transport sequence number
packet status count: 表示这个TransportFeedback包记录了多少个RTP包信息
reference time: 基准时间,以64ms为单位,可以和下面的recv delta求和得到包的接收时间
fb pkt. count: 当前feedback的序列号,用于检测cc-feedback是否丢包
后面会跟着两个数组,代表着transport number以base sequence number为基准递增的包的相关信息
packet chunk: 当前包的到达状态(到达丢失),
recv delta: 接收时间delta,要和reference time求和才能得到真正的接收时间。
可以看到cc-feedback中能得到包的接收状态和时间。
2.2.3.2 transprot-sequence-number
对于cc-feedback,说明一下webrtc的整体处理过程。
webrtc为每个rtp packet添加了一个transport-cc number的rtp extension用来标识每个包的传输序列号,见官方草案描述:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0xBE | 0xDE | length=1 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ID | L=1 |transport-wide sequence number | zero padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
添加该number的主要是分离媒体(使用sequence number)和网络处理(使用transport number)。
在RTPSenderVideo::SendVideo()中使用AllocatePacket()为每帧的数据生成rtp packet的时,默认会为当前packet保留一些rtp-extension, 其中就包括了TransportSequenceNumber。
std::unique_ptr<rtppackettosend> RTPSender::AllocatePacket() const {
...
// Reserve extensions, if registered, RtpSender set in SendToNetwork.
packet->ReserveExtension<absolutesendtime>();
packet->ReserveExtension<transmissionoffset>();
packet->ReserveExtension<transportsequencenumber>();//<----
...
}
- extension register
(接下来这段介绍的是extension 的register过程,不感兴趣的可以不看)
正如上面的AllocatePacket()
中的注释所言,保存这些extension,如果这些extension注册了,那么RtpSender中会对这些extension进行设值; extension Register的过程要从RtpVideoSender溯源,其初始化时将将传入的rtp_config.extension设置到了每个stream的rtp_rtcp中
RtpVideoSender::RtpVideoSender(....){ // 实在太长了,省略一些参数,而不是一个变参构造函数
...
// RTP/RTCP initialization.
for (size_t i = 0; i < rtp_config_.extensions.size(); ++i) {
const std::string& extension = rtp_config_.extensions[i].uri;
// 将rtp_config中的所有extension设置到stream对应的rtp_rtcp module下
int id = rtp_config_.extensions[i].id;
RTC_DCHECK(RtpExtension::IsSupportedForVideo(extension));
for (const RtpStreamSender& stream : rtp_streams_) {
// rtp_rtcp module注册这些extension
stream.rtp_rtcp->RegisterRtpHeaderExtension(extension, id);
}
}
...
}
rtp_rtcp将其转发到packet_generator(实则RTPSender)
void ModuleRtpRtcpImpl2::RegisterRtpHeaderExtension(absl::string_view uri,
int id) {
// 转发到packet_generator
bool registered =
rtp_sender_->packet_generator.RegisterRtpHeaderExtension(uri, id);
RTC_CHECK(registered);
}
RTPSender注册该extension, 然后会看到一个很重要的变量supports_bwe_extension_会被HasBweExtension()检测更新,根据是否已注册了所有的bwe extension设置为true,这个变量决定能否使用padding功能(带宽探测时,当前数据量达不到目标发送码率,用一些历史包或者空数据做带宽填充)
bool RTPSender::RegisterRtpHeaderExtension(absl::string_view uri, int id) {
MutexLock lock(&send_mutex_);
bool registered = rtp_header_extension_map_.RegisterByUri(id, uri);// 注册该extension
supports_bwe_extension_ = HasBweExtension(rtp_header_extension_map_);
UpdateHeaderSizes();
return registered;
}
// bwe所需extension
bool HasBweExtension(const RtpHeaderExtensionMap& extensions_map) {
return extensions_map.IsRegistered(kRtpExtensionTransportSequenceNumber) ||
extensions_map.IsRegistered(kRtpExtensionTransportSequenceNumber02) ||
extensions_map.IsRegistered(kRtpExtensionAbsoluteSendTime) ||
extensions_map.IsRegistered(kRtpExtensionTransmissionTimeOffset);
}
//padding的支持需要bwe extension
bool RTPSender::SupportsPadding() const {
MutexLock lock(&send_mutex_);
return sending_media_ && supports_bwe_extension_;
}
extension register介绍到此为止,看完了整个过程也没有很明确的找到按照注释所言--"发现注册了这个extension,然后才对它这个extension设值"的处理, 但是还是提及到了一些重要的probing的东西
2.2.3.3 feedback packet的生成
最后在paced发送packet的过程中,当packet到达PacketRouter时,将会检测其是否有TransportSequenceNumber, 如果有则将transport_sequence_number设置到到packet的头部
void PacketRouter::SendPacket(std::unique_ptr<rtppackettosend> packet,
const PacedPacketInfo& cluster_info) {
...
MutexLock lock(&modules_mutex_);
// 设置transpoort sequence number
if (packet->HasExtension<transportsequencenumber>()) {
packet->SetExtension<transportsequencenumber>((++transport_seq_) & 0xFFFF);
}
...
}
之后,当packet经过RtpSenderEgress模块的时,在RtpSenderEgress::SendPacket()中会提取其transport_sequence_number生成feedback包,整个流程如下:
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
...
if (auto packet_id = packet->GetExtension<transportsequencenumber>()) {
options.packet_id = *packet_id;
options.included_in_feedback = true;
options.included_in_allocation = true;
// 添加该packet到feedback
AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
}
...
}
构造packet_info,通知feedback_ovserver添加该包
void RtpSenderEgress::AddPacketToTransportFeedback(
uint16_t packet_id,
const RtpPacketToSend& packet,
const PacedPacketInfo& pacing_info) {
if (transport_feedback_observer_) {
size_t packet_size = packet.payload_size() + packet.padding_size();
if (send_side_bwe_with_overhead_) {
packet_size = packet.size();
}
// 构造packet_info
RtpPacketSendInfo packet_info;
packet_info.ssrc = ssrc_;
packet_info.transport_sequence_number = packet_id;
packet_info.rtp_sequence_number = packet.SequenceNumber();
packet_info.length = packet_size;
packet_info.pacing_info = pacing_info;
packet_info.packet_type = packet.packet_type();
// 通知feedback_ovserver添加该包
transport_feedback_observer_->OnAddPacket(packet_info);
}
}
告知RtpTransportControllerSend有包发送了, 调用transport_feedbadck_adapter_为其生成feedback包
void RtpTransportControllerSend::OnAddPacket(
const RtpPacketSendInfo& packet_info) {
feedback_demuxer_.AddPacket(packet_info);
Timestamp creation_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, packet_info, creation_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
// 往adapter_添加feedback
transport_feedback_adapter_.AddPacket(
packet_info,
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0,
creation_time);
});
}
TransportFeedbackAdapter生成feedback packet,将其存入history_中
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
size_t overhead_bytes,
Timestamp creation_time) {
// 生成feedback包
PacketFeedback packet;
packet.creation_time = creation_time;
packet.sent.sequence_number =
seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number);
packet.sent.size = DataSize::Bytes(packet_info.length + overhead_bytes);
packet.sent.audio = packet_info.packet_type == RtpPacketMediaType::kAudio;
packet.network_route = network_route_;
packet.sent.pacing_info = packet_info.pacing_info;
while (!history_.empty() &&
creation_time - history_.begin()->second.creation_time >
kSendTimeHistoryWindow) {
// TODO(sprang): Warn if erasing (too many) old items?
if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_)
in_flight_.RemoveInFlightPacketBytes(history_.begin()->second);
history_.erase(history_.begin());
}
// 以transport_sequence_number和packet为key-valiue,存入history_中
history_.insert(std::make_pair(packet.sent.sequence_number, packet));
}
2.2.3.4 feedback packet再赋值
在收到cc-feedback的rtcp包的时候,会经过层层转发到RTCPReceiver,
void RTCPReceiver::IncomingPacket(rtc::ArrayView<const uint8_t=""> packet) {
if (packet.empty()) {
RTC_LOG(LS_WARNING) << "Incoming empty RTCP packet";
return;
}
PacketInformation packet_information;
// 解析rtcp
if (!ParseCompoundPacket(packet, &packet_information))
return;
// 转发
TriggerCallbacksFromRtcpPacket(packet_information);
}
RTCPReceiver::IncomingPacket()中:
-
使用ParseCompoundPacket()对报文进行解析, ParseCompoundPacket()是一个非常精华的函数,可以再里面找到所有有关的RTCP包的解析(RR,SR,SDES, NACK, CC-FeedBack, Pli, Fir等),其内部会调用HandleTransportFeedback()将cc-feedback解析成transport_feedback,放到packet-information中
void RTCPReceiver::HandleTransportFeedback( const CommonHeader& rtcp_block, PacketInformation* packet_information) { // 解析rtcp_block 生成transport_feedback std::unique_ptr<rtcp::transportfeedback> transport_feedback( new rtcp::TransportFeedback()); if (!transport_feedback->Parse(rtcp_block)) { ++num_skipped_packets_; return; } packet_information->packet_type_flags |= kRtcpTransportFeedback; packet_information->transport_feedback = std::move(transport_feedback); }
-
然后调用TriggerCallbacksFromRtcpPacket()去转发该RTCP包.
TriggerCallbacksFromRtcpPacket()中会将解析出来的transport_feedback转发到RtpTransportControllerSend
void RTCPReceiver::TriggerCallbacksFromRtcpPacket(
const PacketInformation& packet_information) {
...
if (transport_feedback_observer_ &&
(packet_information.packet_type_flags & kRtcpTransportFeedback)) {
uint32_t media_source_ssrc =
packet_information.transport_feedback->media_ssrc();
if (media_source_ssrc == local_ssrc ||
registered_ssrcs.find(media_source_ssrc) != registered_ssrcs.end()) {
// 将包转给RtpTransportControllerSend处理
transport_feedback_observer_->OnTransportFeedback(
*packet_information.transport_feedback);
}
}
...
}
RtpTransportControllerSend将transport_feedback交给TransportFeedbackAdapter解析,获得feedback_msg
void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
feedback_demuxer_.OnTransportFeedback(feedback);
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, feedback, feedback_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
// 解析cc-feedback包获得feedback_msg
absl::optional<transportpacketsfeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback,
feedback_time);
if (feedback_msg && controller_) {
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
}
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
});
}
ProcessTransportFeedback()中可以清楚的看到feedback_msg的结构
absl::optional<transportpacketsfeedback>
TransportFeedbackAdapter::ProcessTransportFeedback(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_receive_time) {
if (feedback.GetPacketStatusCount() == 0) {
RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
return absl::nullopt;
}
TransportPacketsFeedback msg;
msg.feedback_time = feedback_receive_time;
msg.prior_in_flight = in_flight_.GetOutstandingData(network_route_);
// feedback packet 再赋值
msg.packet_feedbacks =
ProcessTransportFeedbackInner(feedback, feedback_receive_time);
if (msg.packet_feedbacks.empty())
return absl::nullopt;
auto it = history_.find(last_ack_seq_num_);
if (it != history_.end()) {
msg.first_unacked_send_time = it->second.sent.send_time;
}
msg.data_in_flight = in_flight_.GetOutstandingData(network_route_);
return msg;
}
其中就有我们最关心的packet_feedbacks, 其是通过调用ProcessTransportFeedbackInner()生成的:
std::vector<packetresult>
TransportFeedbackAdapter::ProcessTransportFeedbackInner(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_receive_time) {
// Add timestamp deltas to a local time base selected on first packet arrival.
// This won't be the true time base, but makes it easier to manually inspect
// time stamps.
// 此处有一个很细节的地方,为了使得timestamp能够被够好的检视,没有直接使用cc-feedback
// 中的reference time(基准时间),而是用了本地的feedback到达时间(feedback_receive_time)作为基准时间(current_offset_)
// 在后续不断的feedback包到达的时候,将cc-feedback之间的reference time的delta,累加到current_offset_中。
if (last_timestamp_.IsInfinite()) {
current_offset_ = feedback_receive_time;
} else {
// TODO(srte): We shouldn't need to do rounding here.
// 计算当前的Base time和之前的Base time的差
const TimeDelta delta = feedback.GetBaseDelta(last_timestamp_)
.RoundDownTo(TimeDelta::Millis(1));
// Protect against assigning current_offset_ negative value.
if (delta < Timestamp::Zero() - current_offset_) {
// current_offset_负数情况下,直接将current_offset_置为feedback_receive_time
RTC_LOG(LS_WARNING) << "Unexpected feedback timestamp received.";
current_offset_ = feedback_receive_time;
} else {
// current_offset 正常,则直接等于即可
current_offset_ += delta;
}
}
last_timestamp_ = feedback.GetBaseTime();
std::vector<packetresult> packet_result_vector;
packet_result_vector.reserve(feedback.GetPacketStatusCount());
size_t failed_lookups = 0;
size_t ignored = 0;
TimeDelta packet_offset = TimeDelta::Zero();
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num = seq_num_unwrapper_.Unwrap(packet.sequence_number());
if (seq_num > last_ack_seq_num_) {
// Starts at history_.begin() if last_ack_seq_num_ < 0, since any valid
// sequence number is >= 0.
for (auto it = history_.upper_bound(last_ack_seq_num_);
it != history_.upper_bound(seq_num); ++it) {
in_flight_.RemoveInFlightPacketBytes(it->second);
}
last_ack_seq_num_ = seq_num;
}
// 根据transport seqnumber从history,将包取出来进行再赋值
auto it = history_.find(seq_num);
if (it == history_.end()) {
++failed_lookups;
continue;
}
if (it->second.sent.send_time.IsInfinite()) {
// TODO(srte): Fix the tests that makes this happen and make this a
// DCHECK.
RTC_DLOG(LS_ERROR)
<< "Received feedback before packet was indicated as sent";
continue;
}
PacketFeedback packet_feedback = it->second;
if (packet.received()) {
packet_offset += packet.delta();
// receive_time = base + delta
packet_feedback.receive_time =
current_offset_ + packet_offset.RoundDownTo(TimeDelta::Millis(1));
// Note: Lost packets are not removed from history because they might be
// reported as received by a later feedback.
history_.erase(it);
}
if (packet_feedback.network_route == network_route_) {
PacketResult result;
result.sent_packet = packet_feedback.sent;
result.receive_time = packet_feedback.receive_time;
packet_result_vector.push_back(result);
} else {
++ignored;
}
}
....
return packet_result_vector;
}
ProcessTransportFeedbackInner()是transport packet再赋值的核心函数,其主要
- 根据transport seqnumber将packet从history_获取出来,然后对其接收时间和接收状态的再赋值
- receive_time有个很细节的地方,并没有直接使用cc-feedback中的reference time(基础时间偏移),而是把第一个收到cc-feedback的时刻(feedback_receive_time)作为最初基准时间偏移(current_offset_),在后续的cc-feedback报文到达后,计算之前cc-feedback的feedback_receive_time和当前的delta,累加到current_offset _上作为后续的基础时间偏移,根据注释所言,是为了能够更好的检视包的到达时间。
至此,feed packet就完成了再复制,接下来会被传到cc模块中去更新码率预估;
2.2.3.5 cc-feedback 总结
cc-feedback的过程其实是发送和接收rtcp的过程,整个过程中涉及到比较多的类,很有总结的价值:
分成左右两边,右边是生成feedback packet的过程,最初由RTPSenderVideo注册cc-extension到RTPSender, RTPSender支持cc-extension后支持PacingController的padding,并生成带有TransportSequenceNumber的包,转发到PacketRouter后设置TransportSequenceNumber, 在网络发送的过程中,经由RTPSenderEgress 将包的信息传到RTPTransportController,让其调用TransportFeedbackAdapter生成feedback packet; 而左边则是RTCPReceiver收到cc-feedback后解析生成transport-feedback,交给RTPTransportController对feedback packet进行再更新,将最终的feedback packet交给cc-controller中,用于带宽预估。
2.2.4 处理ccfeedback-OnTransportPacketsFeedback()
如2.2.3.5
中的图所示,当cc-feedback到来后对feedback packet进行更新后,就会将feedback packet转发到cc-controller:
处理的函数是GoogCcNetworkController::OnTransportPacketsFeedback(), 这是一个很长的函数,通过feedback去估算一个最终的码率,和进一步的probe等。
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
if (report.packet_feedbacks.empty()) {
// TODO(bugs.webrtc.org/10125): Design a better mechanism to safe-guard
// against building very large network queues.
return NetworkControlUpdate();
}
if (congestion_window_pushback_controller_) {
// congestion_windows_pushback_controller 根据feedback更新
// 发送的数据
congestion_window_pushback_controller_->UpdateOutstandingData(
report.data_in_flight.bytes());
}
TimeDelta max_feedback_rtt = TimeDelta::MinusInfinity();
TimeDelta min_propagation_rtt = TimeDelta::PlusInfinity();
Timestamp max_recv_time = Timestamp::MinusInfinity();
// 遍历获取最大的包到达时间(feedback.receive_time)
std::vector<packetresult> feedbacks = report.ReceivedWithSendInfo();
for (const auto& feedback : feedbacks)
max_recv_time = std::max(max_recv_time, feedback.receive_time);
// 从feedback中统计rtt,更新到各个组件
// 遍历获取最大的feedback_rtt(包发出去到收到feed包)和propagation_rtt(包在网络中传输的rtt,不包含在服务端pending的时间)
for (const auto& feedback : feedbacks) {
TimeDelta feedback_rtt =
report.feedback_time - feedback.sent_packet.send_time;
TimeDelta min_pending_time = feedback.receive_time - max_recv_time; // ??
TimeDelta propagation_rtt = feedback_rtt - min_pending_time;
max_feedback_rtt = std::max(max_feedback_rtt, feedback_rtt);
min_propagation_rtt = std::min(min_propagation_rtt, propagation_rtt);
}
// 更新PropagationRtt
if (max_feedback_rtt.IsFinite()) {
feedback_max_rtts_.push_back(max_feedback_rtt.ms());
const size_t kMaxFeedbackRttWindow = 32;
// 滑动窗口feedback_max_rtts,长度为32
if (feedback_max_rtts_.size() > kMaxFeedbackRttWindow)
feedback_max_rtts_.pop_front();
// TODO(srte): Use time since last unacknowledged packet.
bandwidth_estimation_->UpdatePropagationRtt(report.feedback_time,
min_propagation_rtt);
}
// 更新loss和delay estimation的rtt,注意
// loss使用的是feedback_min_rtt
// delay使用的是feedback_max_rtt
if (packet_feedback_only_) {
// 计算平均feed_back_max_rtt
if (!feedback_max_rtts_.empty()) {
// 计算平均feedback_rtt
int64_t sum_rtt_ms = std::accumulate(feedback_max_rtts_.begin(),
feedback_max_rtts_.end(), 0);
int64_t mean_rtt_ms = sum_rtt_ms / feedback_max_rtts_.size();
// 更新bwe的rtt
if (delay_based_bwe_)
delay_based_bwe_->OnRttUpdate(TimeDelta::Millis(mean_rtt_ms));
}
// 计算feedback_min_rtt,更新bandwidth_estimation_ rtt
TimeDelta feedback_min_rtt = TimeDelta::PlusInfinity();
// 这块逻辑和上面计算feedback_max_rtt一样,写了重复代码
for (const auto& packet_feedback : feedbacks) {
TimeDelta pending_time = packet_feedback.receive_time - max_recv_time;
TimeDelta rtt = report.feedback_time -
packet_feedback.sent_packet.send_time - pending_time;
// Value used for predicting NACK round trip time in FEC controller.
feedback_min_rtt = std::min(rtt, feedback_min_rtt);
}
if (feedback_min_rtt.IsFinite()) {
bandwidth_estimation_->UpdateRtt(feedback_min_rtt, report.feedback_time);
}
// 更新丢包率
// 上次更新丢包后到现在应该收到的包的总数
expected_packets_since_last_loss_update_ +=
report.PacketsWithFeedback().size();
for (const auto& packet_feedback : report.PacketsWithFeedback()) {
if (packet_feedback.receive_time.IsInfinite())
lost_packets_since_last_loss_update_ += 1;
}
// feedback_time大于丢包更新时间了,更新丢包率
if (report.feedback_time > next_loss_update_) {
next_loss_update_ = report.feedback_time + kLossUpdateInterval;
bandwidth_estimation_->UpdatePacketsLost(
lost_packets_since_last_loss_update_,
expected_packets_since_last_loss_update_, report.feedback_time);
expected_packets_since_last_loss_update_ = 0;
lost_packets_since_last_loss_update_ = 0;
}
}
// 获取当前是否处于alr
absl::optional<int64_t> alr_start_time =
alr_detector_->GetApplicationLimitedRegionStartTime();
// 告知acknowledge和probe_controller,当前不再处于alr
if (previously_in_alr_ && !alr_start_time.has_value()) {
int64_t now_ms = report.feedback_time.ms();
acknowledged_bitrate_estimator_->SetAlrEndedTime(report.feedback_time);
probe_controller_->SetAlrEndedTimeMs(now_ms);
}
previously_in_alr_ = alr_start_time.has_value();
// 预估接收端吞吐量
acknowledged_bitrate_estimator_->IncomingPacketFeedbackVector(
report.SortedByReceiveTime());
auto acknowledged_bitrate = acknowledged_bitrate_estimator_->bitrate();
// 将其设置到bandwidth_estimation_中去更新链路容量(link_capacity)
bandwidth_estimation_->SetAcknowledgedRate(acknowledged_bitrate,
report.feedback_time);
bandwidth_estimation_->IncomingPacketFeedbackVector(report);
for (const auto& feedback : report.SortedByReceiveTime()) {
if (feedback.sent_packet.pacing_info.probe_cluster_id !=
PacedPacketInfo::kNotAProbe) {
// probe_estimator 根据返回的feedback更新带宽探测的计算
probe_bitrate_estimator_->HandleProbeAndEstimateBitrate(feedback);
}
}
if (network_estimator_) {
// 这一块暂时还在开发中,目前还未使用,不太清楚干什么
network_estimator_->OnTransportPacketsFeedback(report);
auto prev_estimate = estimate_;
estimate_ = network_estimator_->GetCurrentEstimate();
// TODO(srte): Make OnTransportPacketsFeedback signal whether the state
// changed to avoid the need for this check.
if (estimate_ && (!prev_estimate || estimate_->last_feed_time !=
prev_estimate->last_feed_time)) {
event_log_->Log(std::make_unique<rtceventremoteestimate>(
estimate_->link_capacity_lower, estimate_->link_capacity_upper));
}
}
// 获取上面循环更新probe_estimator的最终的结果
absl::optional<datarate> probe_bitrate =
probe_bitrate_estimator_->FetchAndResetLastEstimatedBitrate();
// 如果enable probe < network_estimate时 忽略probe的特性,则忽略probe_bitrate
if (ignore_probes_lower_than_network_estimate_ && probe_bitrate &&
estimate_ && *probe_bitrate < delay_based_bwe_->last_estimate() &&
*probe_bitrate < estimate_->link_capacity_lower) {
probe_bitrate.reset();
}
// 如果enable
// 将probe略小于throughput_estimate_(预估吞吐量)的特性
// 对probe现在acknowledged_bitrate(链路吞吐量)下
if (limit_probes_lower_than_throughput_estimate_ && probe_bitrate &&
acknowledged_bitrate) {
// Limit the backoff to something slightly below the acknowledged
// bitrate. ("Slightly below" because we want to drain the queues
// if we are actually overusing.)
// The acknowledged bitrate shouldn't normally be higher than the delay
// based estimate, but it could happen e.g. due to packet bursts or
// encoder overshoot. We use std::min to ensure that a probe result
// below the current BWE never causes an increase.
DataRate limit =
std::min(delay_based_bwe_->last_estimate(),
*acknowledged_bitrate * kProbeDropThroughputFraction);
probe_bitrate = std::max(*probe_bitrate, limit);
}
NetworkControlUpdate update;
bool recovered_from_overuse = false;
bool backoff_in_alr = false;
// 使用feedback进行bwe预测,获得基于延迟的码率估计
DelayBasedBwe::Result result;
result = delay_based_bwe_->IncomingPacketFeedbackVector(
report, acknowledged_bitrate, probe_bitrate, estimate_,
alr_start_time.has_value());
if (result.updated) {
// 预估码率更新了
if (result.probe) {
// bwe使用了探测码率进行重设
// bandwidth_estimation_也进行重设sendbitrate
bandwidth_estimation_->SetSendBitrate(result.target_bitrate,
report.feedback_time);
}
// Since SetSendBitrate now resets the delay-based estimate, we have to
// call UpdateDelayBasedEstimate after SetSendBitrate.
// 更新bandwidth_estimation_中基于延迟的估计码率
bandwidth_estimation_->UpdateDelayBasedEstimate(report.feedback_time,
result.target_bitrate);
// Update the estimate in the ProbeController, in case we want to probe.
// 将变化的码率通知到probe_controller, alr_detector, congestion_window等
MaybeTriggerOnNetworkChanged(&update, report.feedback_time);
}
recovered_from_overuse = result.recovered_from_overuse;
backoff_in_alr = result.backoff_in_alr;
if (recovered_from_overuse) {
// 从overuse中恢复了,重设alr start 时间
probe_controller_->SetAlrStartTimeMs(alr_start_time);
// 获取接下来要做带宽探测的参数,放到update中
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
} else if (backoff_in_alr) {
// 如果在alr中做了码率回退,进行新一轮的探测?
// If we just backed off during ALR, request a new probe.
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
}
// No valid RTT could be because send-side BWE isn't used, in which case
// we don't try to limit the outstanding packets.
if (rate_control_settings_.UseCongestionWindow() &&
max_feedback_rtt.IsFinite()) {
// TODO 这个window需要花时间看一看,直接一直没有看这个东西
UpdateCongestionWindowSize();
}
if (congestion_window_pushback_controller_ && current_data_window_) {
// 如果有congestion_window_pushback_controller_,将当前的窗口放在通知器下回推给编码器
congestion_window_pushback_controller_->SetDataWindow(
*current_data_window_);
} else {
// 否则,直接放在结果中
update.congestion_window = current_data_window_;
}
// 返回结果
return update;
}
其主要做了:
- 为所有feedback计算propagation_rtt和feedback_rtt(前者是packet单纯在网络上传输的时间,而后者包含了包到达服务器后pending的时间)。从中得到max_feedback_rtt, min_propagation_rtt, feedback_min_rtt.以min_propagation_rtt的计算为例:
for (const auto& feedback : feedbacks) {
TimeDelta feedback_rtt =
report.feedback_time - feedback.sent_packet.send_time;
TimeDelta min_pending_time = feedback.receive_time - max_recv_time; // ??
TimeDelta propagation_rtt = feedback_rtt - min_pending_time;
max_feedback_rtt = std::max(max_feedback_rtt, feedback_rtt);
min_propagation_rtt = std::min(min_propagation_rtt, propagation_rtt);
}
feedback_rtt和propagation_rtt按照下图的方式进行计算(但至今让我疑惑的是min_pending_time其表示packet在接收端最小的等待时间,计算应该是max_recv_time - feedback.receive_time,但实际上却反过来导致结果是一个负数,提了一个issue,还未得到解答.)
-
使用bandwidth_estimation->UpdatePropagationRtt()将min_propagation_rtt更新到其PropagationRtt中; 使用一个队列(feedback_max_rtts)保存最新的32个feedback_max_rtts,然后计算得到一个均值(mean_rtt_ms),使用delay_based_bwe->OnRttUpdate()更新到基于延迟的码率预估器中(delay_based_bwe)的rtt;使用bandwidth_estimation_->UpdateRtt()将feedback_min_rtt更新到其rtt。
-
计算丢包率,使用bandwidth_estimation->UpdatePacketsLost()将丢包率更新到bandwidth_estimation中,
bandwidth_estimation内部会根据丢包率,调整码率.
-
通过alr_detector检测当前是否处于alr状态,如果不是,则告知acknowledged_bitrate_estimator,probe_controller 当前不处于alr状态
-
使用acknowledged_bitrate_estimator根据feedback计算接收端吞吐量(acknowledged_bitrate)。将接收端吞吐量更新到bandwidth_estimation中
-
使用码率探测器(probe_bitrate_estimator)基于返回的feedback计算探测码率(probe_bitrate)。 将接收端吞吐量(acknowledged_bitrate),探测码率(probe_bitrate), feedback等放到基于延迟的码率预估器(delay_based_bwe)做码率预测
-
将delay_based_bwe预估的码率放入bandwidth_estimation中, 让bandwidth_estimation重新综合修正最终目标码率;
-
调用MaybeTriggerOnNetworkChanged()获得最终码率, 将新码率设置到alr_detector, probe_controller等, 根据最新码率和rtt更新拥塞窗口大小(congestion window size), 将最新的congestion window size设置到congestion_window_pushback_controller中.
2.2.5 更新网络与结果-MaybeTriggerOnNetworkChanged()
MaybeTriggerOnNetworkChanged()是在估算完最终的码率的时候,把新的码率更新到alr_detector,probe_controller中,并生成相关结果返回给pacing_controller使用:
void GoogCcNetworkController::MaybeTriggerOnNetworkChanged(
NetworkControlUpdate* update,
Timestamp at_time) {
// 从bandwidth_estimation 获取丢包率, rtt,目标码率
uint8_t fraction_loss = bandwidth_estimation_->fraction_loss();
TimeDelta round_trip_time = bandwidth_estimation_->round_trip_time();
DataRate loss_based_target_rate = bandwidth_estimation_->target_rate();
DataRate pushback_target_rate = loss_based_target_rate;
BWE_TEST_LOGGING_PLOT(1, "fraction_loss_%", at_time.ms(),
(fraction_loss * 100) / 256);
BWE_TEST_LOGGING_PLOT(1, "rtt_ms", at_time.ms(), round_trip_time.ms());
BWE_TEST_LOGGING_PLOT(1, "Target_bitrate_kbps", at_time.ms(),
loss_based_target_rate.kbps());
double cwnd_reduce_ratio = 0.0;
if (congestion_window_pushback_controller_) {
// 更新拥塞控制窗口中的目标码率,同时获取新的拥塞控制码率
int64_t pushback_rate =
congestion_window_pushback_controller_->UpdateTargetBitrate(
loss_based_target_rate.bps());
// 不能大于min_bitrate_configured_.bps<int>();
pushback_rate = std::max<int64_t>(bandwidth_estimation_->GetMinBitrate(),
pushback_rate);
pushback_target_rate = DataRate::BitsPerSec(pushback_rate);
if (rate_control_settings_.UseCongestionWindowDropFrameOnly()) {
// 如果rate_control仅使用了丢帧码率控制,通过预估码率和拥塞控制码率得到丢帧率
cwnd_reduce_ratio = static_cast<double>(loss_based_target_rate.bps() -
pushback_target_rate.bps()) /
loss_based_target_rate.bps();
}
}
// 保守码率会从(stable_target_rate) 会从link_capacity,loss_based_target_rate, pushback_target_rate中的最小的取得
DataRate stable_target_rate =
bandwidth_estimation_->GetEstimatedLinkCapacity();
if (loss_based_stable_rate_) {
stable_target_rate = std::min(stable_target_rate, loss_based_target_rate);
} else {
stable_target_rate = std::min(stable_target_rate, pushback_target_rate);
}
// 保存最新的
if ((loss_based_target_rate != last_loss_based_target_rate_) ||
(fraction_loss != last_estimated_fraction_loss_) ||
(round_trip_time != last_estimated_round_trip_time_) ||
(pushback_target_rate != last_pushback_target_rate_) ||
(stable_target_rate != last_stable_target_rate_)) {
last_loss_based_target_rate_ = loss_based_target_rate;
last_pushback_target_rate_ = pushback_target_rate;
last_estimated_fraction_loss_ = fraction_loss;
last_estimated_round_trip_time_ = round_trip_time;
last_stable_target_rate_ = stable_target_rate;
// 更新alr探测中的目标码率
alr_detector_->SetEstimatedBitrate(loss_based_target_rate.bps());
// 获取到下次bwe overuse 的时间
TimeDelta bwe_period = delay_based_bwe_->GetExpectedBwePeriod();
TargetTransferRate target_rate_msg;
target_rate_msg.at_time = at_time;
if (rate_control_settings_.UseCongestionWindowDropFrameOnly()) {
// 仅使用拥塞控制窗口仅丢帧(CongestionWindowDropFrameOnly),此时
// 动态调整编码器码率,直接使用预估码率作为目标码率
target_rate_msg.target_rate = loss_based_target_rate;
target_rate_msg.cwnd_reduce_ratio = cwnd_reduce_ratio;
} else {
// 否则的话,需要调整编码器码率,则将pushback_target_rate作为目标码率
target_rate_msg.target_rate = pushback_target_rate;
}
target_rate_msg.stable_target_rate = stable_target_rate;
target_rate_msg.network_estimate.at_time = at_time;
target_rate_msg.network_estimate.round_trip_time = round_trip_time;
target_rate_msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0f; // fraction_loss计算的时候乘了256
target_rate_msg.network_estimate.bwe_period = bwe_period;
update->target_rate = target_rate_msg;
// 将最新码率放入probe_controller中, 获得需要做的码率探测
auto probes = probe_controller_->SetEstimatedBitrate(
loss_based_target_rate.bps(), at_time.ms());
update->probe_cluster_configs.insert(update->probe_cluster_configs.end(),
probes.begin(), probes.end());
//获取pacing rate, 原理就是设置一个1s的窗口,把target_rate放过去
update->pacer_config = GetPacingRates(at_time);
RTC_LOG(LS_VERBOSE) << "bwe " << at_time.ms() << " pushback_target_bps="
<< last_pushback_target_rate_.bps()
<< " estimate_bps=" << loss_based_target_rate.bps();
}
}
GoogCcNetworkController::MaybeTriggerOnNetworkChanged()中:
-
首先从bandwidth_estimation获取rtt, 丢包率,目标码率
-
然后通过目标码率更新拥塞控制窗口的发送码率(pushback_target_rate)后,利用拥塞窗口的发送码率减去当前预估的目标码率,求得一个码率降低率(cwnd_reduce_ratio), 这个值是用来控制编码的
-
计算一个保守码率(stable_target_rate), 该保守码率从以下三者最小的取得:
链路容量(link_capacity): 基于目标码率和当前吞吐量(acknowledge)做指数平滑估计出来的链路容量
丢包码率(loss_based_target_rate): 基于丢包估算出来的链路码率,该码率并以延迟预估码率为上限
窗口控制码率(pushback_target_rate):基于loss_based_target_rate和当前窗口的使用程度缩放后的码率
-
将新的目标码率更新至Alr_Detector中
-
根据是否启用拥塞窗口仅丢帧控制码率的特性, 如果是,编码器码率的调整是直接丢帧的调整的,此时直接使用loss_based_target_rate作为目标码率,否则的话,使用窗口控制码率(pushback_target_rate)作为目标码率
-
将最新码率更新到probe_controller中, 同时获得可能需要做的码率探测
-
通过GetPacingRates()获取节律发送(pacer)控制
PacerConfig GoogCcNetworkController::GetPacingRates(Timestamp at_time) const { // Pacing rate is based on target rate before congestion window pushback, // because we don't want to build queues in the pacer when pushback occurs. // 此处的pacing rate使用的是last_loss_based_target_rate_, 这个值没有经过拥塞窗口的更新处理 // 但是没太看懂注释,"当退避产生的时候不想在pacer创建队列",因为pacer有两种,一种是有queue的 // 一种是无queue的,可能想要表达的是congestion push back不应用在有queue的队列上? DataRate pacing_rate = std::max(min_total_allocated_bitrate_, last_loss_based_target_rate_) * pacing_factor_; // padding_rate 主要的值还是max_padding_rate_,这是一个来自于外部(bitrateAllocation)计算的一个值 // 其次,它肯定不能大于窗口控制的码率(last_pushback_target_rate_) DataRate padding_rate = std::min(max_padding_rate_, last_pushback_target_rate_); PacerConfig msg; msg.at_time = at_time; msg.time_window = TimeDelta::Seconds(1);//1s msg.data_window = pacing_rate * msg.time_window; msg.pad_window = padding_rate * msg.time_window; return msg; }
cc-controller的介绍到此为止,肯定是看到云里雾里; 接下来会分小节,进一步介绍涉及到的AlrDetector, ProbeController , ProbeBitrateEstimator , AcknowledgedBitrateEstimator , DelayBasedBwe , SendSideBandwidthEstimation, CongestionWindowPushbackController,查阅完这些再回头看cc-controller码控过程就容易了。
2.3 码率受限探测器-AlrDetector
AlrDetector(Application Limit Region Detector)码率受限探测器, 正如其名, 是用来检测当前的发送码率是否因编码器等其它应用远低于目标码率的情况. 在每次发送数据的时候,cc-controller的OnSentPacket()将会被更新前发送的数据大小和时间, 这些信息会被传递到AlrDetector中进行更新, 同时检测当前是否处于alr状态
NetworkControlUpdate GoogCcNetworkController::OnSentPacket(
SentPacket sent_packet) {
// 将发送数据和时间更新到alr
alr_detector_->OnBytesSent(sent_packet.size.bytes(),
sent_packet.send_time.ms());
// 告知acknowledged_bitrate_estimator_是否处于alr状态
acknowledged_bitrate_estimator_->SetAlr(
alr_detector_->GetApplicationLimitedRegionStartTime().has_value());
...
}
AlrDetector::OnBytesSent()是进行更新检测的地方, 原理很简单, 内部设置了一个alr_budget, alr_budget的大小会随着流逝的时间增加(delta_ms()间隔时间 * target_rate(目标码率)), 并发送数据大小byte_send记录当前已使用的budget, 然后计算使用的budget是否达不到一个预设的比例, 判断当前发送码率是否过低, 从而开启alr状态
void AlrDetector::OnBytesSent(size_t bytes_sent, int64_t send_time_ms) {
if (!last_send_time_ms_.has_value()) {
last_send_time_ms_ = send_time_ms;
// Since the duration for sending the bytes is unknwon, return without
// updating alr state.
return;
}
int64_t delta_time_ms = send_time_ms - *last_send_time_ms_;
last_send_time_ms_ = send_time_ms;
// 使用budget
alr_budget_.UseBudget(bytes_sent);
// 更新budget
alr_budget_.IncreaseBudget(delta_time_ms);
bool state_changed = false;
if (alr_budget_.budget_ratio() > conf_.start_budget_level_ratio &&
!alr_started_time_ms_) {
// 使用的budget小于预设的一定比例,alr状态开启
alr_started_time_ms_.emplace(rtc::TimeMillis());
state_changed = true;
} else if (alr_budget_.budget_ratio() < conf_.stop_budget_level_ratio &&
alr_started_time_ms_) {
state_changed = true;
alr_started_time_ms_.reset();
}
if (event_log_ && state_changed) {
event_log_->Log(
std::make_unique<rtceventalrstate>(alr_started_time_ms_.has_value()));
}
}
2.4 Probe
probe(探测)涉及到的类有三个:
ProbeController : 用来做探测控制的, 检查是否需要探测
BitrateProber : 在实际的过程中执行探测,控制发送码率大小
ProbeBitrateEstimator :是通过cc-feedback的报文得到探测包的发送情况,从而预估码率
2.4.1 探测控制器-ProbeController
ProbeController在cc-controller创建的时候就被创建, 并执行Reset()做初始化, Reset()函数中最关心的变量是state, 它被设置成State::kInit
标识着还处于初始阶段未进行探测.
ProbeController::ProbeController(const WebRtcKeyValueConfig* key_value_config,
RtcEventLog* event_log)
: enable_periodic_alr_probing_(false),
in_rapid_recovery_experiment_(absl::StartsWith(
key_value_config->Lookup(kBweRapidRecoveryExperiment),
"Enabled")),
limit_probes_with_allocateable_rate_(!absl::StartsWith(
key_value_config->Lookup(kCappedProbingFieldTrialName),
"Disabled")),
event_log_(event_log),
config_(ProbeControllerConfig(key_value_config)) {
Reset(0);
}
void ProbeController::Reset(int64_t at_time_ms) {
network_available_ = true;
state_ = State::kInit; // !
min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
time_last_probing_initiated_ms_ = 0;
estimated_bitrate_bps_ = 0;
start_bitrate_bps_ = 0;
max_bitrate_bps_ = 0;
int64_t now_ms = at_time_ms;
last_bwe_drop_probing_time_ms_ = now_ms;
alr_end_time_ms_.reset();
mid_call_probing_waiting_for_result_ = false;
time_of_last_large_drop_ms_ = now_ms;
bitrate_before_last_large_drop_bps_ = 0;
max_total_allocated_bitrate_ = 0;
}
probe_controller随着cc-controller建立完成后, 首先会在GoogCcNetworkController::OnProcessInterval()
被做一些设置和初始化:
NetworkControlUpdate GoogCcNetworkController::OnProcessInterval(
ProcessInterval msg) {
NetworkControlUpdate update;
if (initial_config_) {
// 重设loss_based和delay_based探测器和probe的初始码率
// 获得码率探测簇配置(probe_cluster_config)
update.probe_cluster_configs =
ResetConstraints(initial_config_->constraints);
// probe_controller enable alr probing
if (initial_config_->stream_based_config.requests_alr_probing) {
probe_controller_->EnablePeriodicAlrProbing(
*initial_config_->stream_based_config.requests_alr_probing);
}
absl::optional<datarate> total_bitrate =
initial_config_->stream_based_config.max_total_allocated_bitrate;
if (total_bitrate) {
// 为probe设置最大的分配码率(MaxTotalAllocatedBitrate)作为探测的上边界
// 并生成响应的probe_cluster_config去进行探测
auto probes = probe_controller_->OnMaxTotalAllocatedBitrate(
total_bitrate->bps(), msg.at_time.ms());
...
}
...
}
...
// probe_controller_设置当前alr状态
absl::optional<int64_t> start_time_ms =
alr_detector_->GetApplicationLimitedRegionStartTime();
probe_controller_->SetAlrStartTimeMs(start_time_ms);
// probe_controller 定时检测是否需要进行process
auto probes = probe_controller_->Process(msg.at_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
// 获取更新后的码率,probe等,同时对alr, probe_controller中的码率进行更新
MaybeTriggerOnNetworkChanged(&update, msg.at_time);
return update;
}
在OnProcessInterval()中首先调用ResetConstraints()将config中的min_data_rate, max_data_rate, starting_rate放入probe_controller_中
std::vector<probeclusterconfig> GoogCcNetworkController::ResetConstraints(
TargetRateConstraints new_constraints) {
min_target_rate_ = new_constraints.min_data_rate.value_or(DataRate::Zero());
max_data_rate_ =
new_constraints.max_data_rate.value_or(DataRate::PlusInfinity());
starting_rate_ = new_constraints.starting_rate;
ClampConstraints();
bandwidth_estimation_->SetBitrates(starting_rate_, min_data_rate_,
max_data_rate_, new_constraints.at_time);
if (starting_rate_)
delay_based_bwe_->SetStartBitrate(*starting_rate_);
delay_based_bwe_->SetMinBitrate(min_data_rate_);
// 设置初始target_bitrate,获得最初的探测config
return probe_controller_->SetBitrates(
min_data_rate_.bps(), GetBpsOrDefault(starting_rate_, -1),
max_data_rate_.bps_or(-1), new_constraints.at_time.ms());
}
2.4.1.1 SetBitrates()
在ProbeController::SetBitrates()
设置的过程中就会开始做码率探测, 由于是首次调用,所以会进入State::kInit
的分支,执行InitiateExponentialProbing()
std::vector<probeclusterconfig> ProbeController::SetBitrates(
int64_t min_bitrate_bps,
int64_t start_bitrate_bps,
int64_t max_bitrate_bps,
int64_t at_time_ms) {
if (start_bitrate_bps > 0) {
start_bitrate_bps_ = start_bitrate_bps;
estimated_bitrate_bps_ = start_bitrate_bps;
} else if (start_bitrate_bps_ == 0) {
// 没有start_bitrate_bps_, 默认设为min_bitrate_bps
start_bitrate_bps_ = min_bitrate_bps;
}
// The reason we use the variable |old_max_bitrate_pbs| is because we
// need to set |max_bitrate_bps_| before we call InitiateProbing.
int64_t old_max_bitrate_bps = max_bitrate_bps_;
max_bitrate_bps_ = max_bitrate_bps;
switch (state_) {
case State::kInit:
// init从start_bitrate_bps_开始探测起
if (network_available_)
return InitiateExponentialProbing(at_time_ms);
break;
case State::kWaitingForProbingResult:
break;
case State::kProbingComplete:
// If the new max bitrate is higher than both the old max bitrate and the
// estimate then initiate probing.
if (estimated_bitrate_bps_ != 0 &&
old_max_bitrate_bps < max_bitrate_bps_ &&
estimated_bitrate_bps_ < max_bitrate_bps_) {
// The assumption is that if we jump more than 20% in the bandwidth
// estimate or if the bandwidth estimate is within 90% of the new
// max bitrate then the probing attempt was successful.
mid_call_probing_succcess_threshold_ =
std::min(estimated_bitrate_bps_ * 1.2, max_bitrate_bps_ * 0.9);
mid_call_probing_waiting_for_result_ = true;
mid_call_probing_bitrate_bps_ = max_bitrate_bps_;
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.Initiated",
max_bitrate_bps_ / 1000);
return InitiateProbing(at_time_ms, {max_bitrate_bps_}, false);
}
break;
}
return std::vector<probeclusterconfig>();
}
InitiateExponentialProbing()中用了扩大两个系数first_exponential_probe_scale(3.0)
和second_exponential_probe_scale(3.0)
乘以start_bitrate_bps得到第一个和第二个要探测的码率, 然后将这两个码率入参传递到InitiateProbing()
去生成探测的配置
std::vector<probeclusterconfig> ProbeController::InitiateExponentialProbing(
int64_t at_time_ms) {
RTC_DCHECK(network_available_);
RTC_DCHECK(state_ == State::kInit);
RTC_DCHECK_GT(start_bitrate_bps_, 0);
// When probing at 1.8 Mbps ( 6x 300), this represents a threshold of
// 1.2 Mbps to continue probing.
// 设置probe的初始码率,初始的前两个探测值为:
// config_.first_exponential_probe_scale(3.0) * start_bitrate_bps_
// config_.second_exponential_probe_scale(6.0) *start_bitrate_bps_
std::vector<int64_t> probes = {static_cast<int64_t>(
config_.first_exponential_probe_scale * start_bitrate_bps_)};
if (config_.second_exponential_probe_scale) {
probes.push_back(config_.second_exponential_probe_scale.Value() *
start_bitrate_bps_);
}
return InitiateProbing(at_time_ms, probes, true);
}
2.4.1.2 生成探测码率- InitiateProbing()
InitiateProbing()
是ProbeController中最核心的函数, 其为传入的每个码率生成probe config, 这些config最终会在pacing controller的bitrate_prober中使用去影响发包速度, InitiateProbing()在ProbeController中会在多地调用, 凡是发生了什么网络变化的,涉及到要重新探测码率的就会调用该函数去生成probe config
/**
* @description: 根据bitrate生成ProbeClusterConfig以供bitrate_prober使用
* @param {bitrates_to_probe} 初始化要探测码率的数组
* @return {std::vector<probeclusterconfig>} 要进行探测的组
*/
std::vector<probeclusterconfig> ProbeController::InitiateProbing(
int64_t now_ms,
std::vector<int64_t> bitrates_to_probe,
bool probe_further) {
// 获取当前最大探测码率(max_probe_bitrate_bps),受制于两者:
// max_birate_bps_
// max_total_allocated_bitrate_
int64_t max_probe_bitrate_bps =
max_bitrate_bps_ > 0 ? max_bitrate_bps_ : kDefaultMaxProbingBitrateBps;
if (limit_probes_with_allocateable_rate_ &&
max_total_allocated_bitrate_ > 0) {
// 如果设置了probe受限于分配码率(allocateable_rate_),并且设置了最大的受限分配码率(max_total_allocated_bitrate)
// max_probe_bitrate_bps 不能大于2倍的max_total_allocated_bitrate_
// If a max allocated bitrate has been configured, allow probing up to 2x
// that rate. This allows some overhead to account for bursty streams,
// which otherwise would have to ramp up when the overshoot is already in
// progress.
// It also avoids minor quality reduction caused by probes often being
// received at slightly less than the target probe bitrate.
max_probe_bitrate_bps =
std::min(max_probe_bitrate_bps, max_total_allocated_bitrate_ * 2);
}
std::vector<probeclusterconfig> pending_probes;
// 每个要探测的bitrate都会以probeClusterConfig发送到pace_controoler,
// 在此为初始要特测的码率生成probe config
for (int64_t bitrate : bitrates_to_probe) {
RTC_DCHECK_GT(bitrate, 0);
// 要探测的码率大于最大探测码率,不再进行探测,并置进一步探测标识(probe_further)为false
if (bitrate > max_probe_bitrate_bps) {
bitrate = max_probe_bitrate_bps;
probe_further = false;// !
}
// 为当前bitrate 生成config
ProbeClusterConfig config;
config.at_time = Timestamp::Millis(now_ms);
config.target_data_rate =
DataRate::BitsPerSec(rtc::dchecked_cast<int>(bitrate)); //探测目标码率
config.target_duration = TimeDelta::Millis(kMinProbeDurationMs); // 目标探测时间(15ms)
config.target_probe_count = kMinProbePacketsSent; //目标探测包个数(5)
config.id = next_probe_cluster_id_; // 当前探测簇id( cluster_id )
next_probe_cluster_id_++;
MaybeLogProbeClusterCreated(event_log_, config);
pending_probes.push_back(config);
}
time_last_probing_initiated_ms_ = now_ms;
if (probe_further) {
// 如果启用进一步探测
// 设置当前探测状态为waiting
state_ = State::kWaitingForProbingResult;
// 计算一个最小进一步探测需要满足的码率,该值会在probe_controller更新预估码率即执行SetEstimatedBitrate()时
// 用来检测预估的码率是否大于min_bitrate_to_probe_further_bps_,如果是会触发探测,
min_bitrate_to_probe_further_bps_ =
(*(bitrates_to_probe.end() - 1)) * config_.further_probe_threshold;
} else {
// probe_further == false -> 整个probe_controller_完成探测
state_ = State::kProbingComplete;
min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
}
return pending_probes;
}
ProbeController::InitiateProbing()中主要做了如下:
-
获取max_probe_bitrate_bps(探测码率上限), 对每个要探测的码率判断是否大于该上限, 不符合的剔除
-
为每个符合要探测的码率生成ProbeClusterConfig, 其结构如下所示:
struct ProbeClusterConfig { Timestamp at_time = Timestamp::PlusInfinity(); DataRate target_data_rate = DataRate::Zero(); // 要探测的目标码率 TimeDelta target_duration = TimeDelta::Zero(); // 探测时间 int32_t target_probe_count = 0; // 探测包个数 int32_t id = 0; // 当前探测簇id, 用来标明一组探测 };
-
如果入参probe_further为true的话, 说明进行此轮探测后,还会进一步的进行探测, 会将state设置为
State::kWaitingForProbingResult
, 并且根据探测码率的最后一个生成一个进一步探测的最低码率
2.4.1.3 周期性检测是否探测-Process()
完成了SetBitrates()后,在GoogCcNetworkController::OnProcessInterval()
中, 会根据config调用probe_controller_->EnablePeriodicAlrProbing()
开启probe controller的周期性Alr状态下的码率探测
void ProbeController::EnablePeriodicAlrProbing(bool enable) {
enable_periodic_alr_probing_ = enable;
}
这个值会在ProbeController::Process()
起作用, ProbeController::Process()
随着GoogCcNetworkController::OnProcessInterval()
周期性的调用而调用.
/**
* @description: 周期性的进行检测,如果当前处于alr状态,检测是否该做probe了
* @param {at_time_ms} 当前检测时刻
* @return {*}
*/
std::vector<probeclusterconfig> ProbeController::Process(int64_t at_time_ms) {
if (at_time_ms - time_last_probing_initiated_ms_ >
kMaxWaitingTimeForProbingResultMs) {
mid_call_probing_waiting_for_result_ = false;
if (state_ == State::kWaitingForProbingResult) {
// 周期性检查到了,但仍处于State::kWaitingForProbingResult状态,说明cc-feedback还没回来
// 没触发SetEstimatedBitrate,被认为超时, 提前结束进一步探测.
// 维持原码率即可
RTC_LOG(LS_INFO) << "kWaitingForProbingResult: timeout";
state_ = State::kProbingComplete;
min_bitrate_to_probe_further_bps_ = kExponentialProbingDisabled;
}
}
if (enable_periodic_alr_probing_ && state_ == State::kProbingComplete) {
// 如果kProbingComplete了,但是启用了alr下进行探测,生成alr探测的config
// Probe bandwidth periodically when in ALR state.
if (alr_start_time_ms_ && estimated_bitrate_bps_ > 0) {
// 当前处于alr状态,使用probe进行快速探测
// 使用alr状态开始时间或上一次probe的时间两者中的最大者 + interval作为下次probe时间
int64_t next_probe_time_ms =
std::max(*alr_start_time_ms_, time_last_probing_initiated_ms_) +
config_.alr_probing_interval->ms();
if (at_time_ms >= next_probe_time_ms) {
// 当前时间已经超过了下次探测时间,直接在预估码率的基础上进行探测
return InitiateProbing(at_time_ms,
{static_cast<int64_t>(estimated_bitrate_bps_ *
config_.alr_probe_scale)},
true);
}
}
}
return std::vector<probeclusterconfig>();
}
ProbeController::Process()
主要做了:
- 检查是否处于
kWaitingForProbingResult
状态, 如果是,则认为当前处于进一步的探测中,但是feedback却还没回来, 被认为超时, 提前结束进一步探测. - 如果启用了Alr(enable_periodic_alr_probing), 当前处于Alr状态下, 会首先通过alr状态开启时间(alr_start_time_ms)或上一次probe的时间(time_last_probing_initiated_ms) 加上一个interval 作为下次探测时间,当当前时间满足之后, 基于预估码率乘上一个系数变换作为探测码率去探测.
2.4.1.4 更新预估码率-SetEstimatedBitrate()
除了上述函数之外, cc-controler在接收到feedback之后进行码率预估结束后会调用GoogCcNetworkController::MaybeTriggerOnNetworkChanged()
告知probe_controller 预估码率发生改变, probe_controller会调用ProbeController::SetEstimatedBitrate()
去修改预估码率:
std::vector<probeclusterconfig> ProbeController::SetEstimatedBitrate(
int64_t bitrate_bps,
int64_t at_time_ms) {
if (mid_call_probing_waiting_for_result_ &&
bitrate_bps >= mid_call_probing_succcess_threshold_) {
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.Success",
mid_call_probing_bitrate_bps_ / 1000);
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.MidCallProbing.ProbedKbps",
bitrate_bps / 1000);
mid_call_probing_waiting_for_result_ = false;
}
std::vector<probeclusterconfig> pending_probes;
if (state_ == State::kWaitingForProbingResult) {
// Continue probing if probing results indicate channel has greater
// capacity.
RTC_LOG(LS_INFO) << "Measured bitrate: " << bitrate_bps
<< " Minimum to probe further: "
<< min_bitrate_to_probe_further_bps_;
if (min_bitrate_to_probe_further_bps_ != kExponentialProbingDisabled &&
bitrate_bps > min_bitrate_to_probe_further_bps_) {
// 大于最小的进一步探测码率,可以继续探测
pending_probes = InitiateProbing(
at_time_ms,
{static_cast<int64_t>(config_.further_exponential_probe_scale *
bitrate_bps)},
true);
}
}
if (bitrate_bps < kBitrateDropThreshold * estimated_bitrate_bps_) {
// 当前设置的bitrate_bps 比estimated_bitrate_bps_ 小很多
// 发生了large drop
time_of_last_large_drop_ms_ = at_time_ms;
bitrate_before_last_large_drop_bps_ = estimated_bitrate_bps_;
}
// 将estimated_bitrate_bps_更新
estimated_bitrate_bps_ = bitrate_bps;
return pending_probes;
}
在ProbeController::SetEstimatedBitrate()中可以看到 :
- 检测是否处于
kWaitingForProbingResult
,如果是则说明需要进一步探测(further detection), 将新设进来的预估码率判断是否大于最低进一步探测码率(min_bitrate_to_probe_further_bps), 如果是则将它乘以一个变换系数后去生成probe config - 判断新设的码率是否比旧码率小很多,如果是, 则标记发生了大的跌落(large_drop), 如果probe controller启用了快恢复的特性, 即使state已经是
kProbingComplete
, 也会在RequestProbe()中生成probe config用于检测当前真的发生稳定的large_drop.
2.4.1.5 检测是否探测-RequestProbe()
RequestProbe()在GoogCcNetworkController::OnTransportPacketsFeedback()中会被调用, 调用的场景主要是需要快速恢复, 或是Alr下探测码率需要借probe修正的情况.
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
...
if (recovered_from_overuse) {
// 从overuse中恢复了,重设alr start 时间
probe_controller_->SetAlrStartTimeMs(alr_start_time);
// 获取接下来要做带宽探测的参数,放到update中
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
} else if (backoff_in_alr) {
// 在alr状态下发生了码率下降, delay_base使用预估码率而不是ack码率下降,需要发送probe
// If we just backed off during ALR, request a new probe.
auto probes = probe_controller_->RequestProbe(report.feedback_time.ms());
update.probe_cluster_configs.insert(update.probe_cluster_configs.end(),
probes.begin(), probes.end());
}
...
}
其主要原理很简单, 去探测之前在SetEstimatedBitrate()
记录下来暴跌前的码率, 看看是否真的暴跌, 从而帮助快恢复
std::vector<probeclusterconfig> ProbeController::RequestProbe(
int64_t at_time_ms) {
// Called once we have returned to normal state after a large drop in
// estimated bandwidth. The current response is to initiate a single probe
// session (if not already probing) at the previous bitrate.
//
// If the probe session fails, the assumption is that this drop was a
// real one from a competing flow or a network change.
bool in_alr = alr_start_time_ms_.has_value();
bool alr_ended_recently =
(alr_end_time_ms_.has_value() &&
at_time_ms - alr_end_time_ms_.value() < kAlrEndedTimeoutMs);
// 处于alr或者刚刚还处于alr或者启用了快恢复
if (in_alr || alr_ended_recently || in_rapid_recovery_experiment_) {
if (state_ == State::kProbingComplete) {
// 获取码率大幅下降前的码率,去探测是否真的是large drop
uint32_t suggested_probe_bps =
kProbeFractionAfterDrop * bitrate_before_last_large_drop_bps_;
uint32_t min_expected_probe_result_bps =
(1 - kProbeUncertainty) * suggested_probe_bps;
int64_t time_since_drop_ms = at_time_ms - time_of_last_large_drop_ms_;
int64_t time_since_probe_ms = at_time_ms - last_bwe_drop_probing_time_ms_;
if (min_expected_probe_result_bps > estimated_bitrate_bps_ &&
time_since_drop_ms < kBitrateDropTimeoutMs &&
time_since_probe_ms > kMinTimeBetweenAlrProbesMs) {
RTC_LOG(LS_INFO) << "Detected big bandwidth drop, start probing.";
// Track how often we probe in response to bandwidth drop in ALR.
RTC_HISTOGRAM_COUNTS_10000(
"WebRTC.BWE.BweDropProbingIntervalInS",
(at_time_ms - last_bwe_drop_probing_time_ms_) / 1000);
last_bwe_drop_probing_time_ms_ = at_time_ms;
return InitiateProbing(at_time_ms, {suggested_probe_bps}, false);
}
}
}
return std::vector<probeclusterconfig>();
}
2.4.2 探测器-BitrateProber
2.4.2.1 创建探测簇-CreateProbeCluster()
BitrateProber是用来消费ProbeController创建的ProbeClusterConfig, 新生成的ProbeClusterConfig会由RtpTransportControllerSende::PostUpdates()进行转发到PacingController
void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
if (update.congestion_window) {
pacer()->SetCongestionWindow(*update.congestion_window);
}
if (update.pacer_config) {
pacer()->SetPacingRates(update.pacer_config->data_rate(),
update.pacer_config->pad_rate());
}
for (const auto& probe : update.probe_cluster_configs) {
// 转发probe cluster config
pacer()->CreateProbeCluster(probe.target_data_rate, probe.id);
}
if (update.target_rate) {
control_handler_->SetTargetRate(*update.target_rate);
UpdateControlState();
}
}
void PacingController::CreateProbeCluster(DataRate bitrate, int cluster_id) {
prober_.CreateProbeCluster(bitrate, CurrentTime(), cluster_id);
}
最后转发到BitrateProber:
void BitrateProber::CreateProbeCluster(DataRate bitrate,
Timestamp now,
int cluster_id) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
RTC_DCHECK_GT(bitrate, DataRate::Zero());
total_probe_count_++;
// 清除过期的cluster
while (!clusters_.empty() &&
now - clusters_.front().created_at > kProbeClusterTimeout) {
clusters_.pop();
total_failed_probe_count_++;
}
// 创建probe cluster, 放入clusters中
ProbeCluster cluster;
cluster.created_at = now;
cluster.pace_info.probe_cluster_min_probes = config_.min_probe_packets_sent;
cluster.pace_info.probe_cluster_min_bytes =
(bitrate * config_.min_probe_duration.Get()).bytes();
RTC_DCHECK_GE(cluster.pace_info.probe_cluster_min_bytes, 0);
cluster.pace_info.send_bitrate_bps = bitrate.bps();
cluster.pace_info.probe_cluster_id = cluster_id;
clusters_.push(cluster);
RTC_LOG(LS_INFO) << "Probe cluster (bitrate:min bytes:min packets): ("
<< cluster.pace_info.send_bitrate_bps << ":"
<< cluster.pace_info.probe_cluster_min_bytes << ":"
<< cluster.pace_info.probe_cluster_min_probes << ")";
// If we are already probing, continue to do so. Otherwise set it to
// kInactive and wait for OnIncomingPacket to start the probing.
// 已处于probing则保持,否则等待有包来的时候,才设置为active
if (probing_state_ != ProbingState::kActive)
probing_state_ = ProbingState::kInactive;
}
在BitrateProber::CreateProbeCluster()中:
- 以probe cluster的方式管理所有的probe
- 每当要创建的probe cluster的时候,首先会遍历clusters队列, 将过期的cluster给剔除
- 根据传参进来的bitrate和cluster_id生成probe_cluster,然后将其放入cluster数组
2.4.2.2 计算当前探测发送码率-RecommendedMinProbeSize()
在PacingController::ProcessPackets()从排队队列获取包进行发包的时候, 会调用BitrateProber::RecommendedMinProbeSize()去获取当前要探测的码率, 然后再下面的循环发包的过程, 检测到发包码率小于探测码率, 则会调用PacketRouter::GeneratePadding()生成padding包去弥补探测码率.生成padding包的过程有机会再细说(TODO), 暂时可参考此文
void PacingController::ProcessPackets() {
....
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
// 获取当前的prober cluster
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
// 从prober中获取要探测的码率
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
....
while (!paused_) {
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
// 取不到包了,看里probe的大小还差多少
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
// 生成padding包去弥补探测码率
std::vector<std::unique_ptr<rtppackettosend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
// pading包入队列
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
....
}
....
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
//prober更新已发送大小
prober_.ProbeSent(CurrentTime(), data_sent);
}
}
}
在执行BitrateProber::RecommendedMinProbeSize()前实际上执行了BitrateProber::CurrentCluster(), 此函数会更新BitrateProber的cluster数组, 将过期的cluster丢弃
absl::optional<pacedpacketinfo> BitrateProber::CurrentCluster(Timestamp now) {
if (clusters_.empty() || probing_state_ != ProbingState::kActive) {
return absl::nullopt;
}
// 队头的cluster已经过期了,丢弃
if (config_.abort_delayed_probes && next_probe_time_.IsFinite() &&
now - next_probe_time_ > config_.max_probe_delay.Get()) {
RTC_DLOG(LS_WARNING) << "Probe delay too high"
" (next_ms:"
<< next_probe_time_.ms() << ", now_ms: " << now.ms()
<< "), discarding probe cluster.";
clusters_.pop();
if (clusters_.empty()) {
probing_state_ = ProbingState::kSuspended;
return absl::nullopt;
}
}
PacedPacketInfo info = clusters_.front().pace_info;
info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
return info;
}
然后才执行BitrateProber::RecommendedMinProbeSize(), 可以看到获取的这个探测码率为2倍的probe_delta时间下的码率, 这是因为PacingController的发包涉及到进程和任务的调度, 不能保证队列有包就能马上触发发送,在CPU高的时候,会有延迟,所以将这个探测区间扩大去平衡这种因素, 同时PacingController每次发包的时候会从BitrateProber获取下次要探测的时间以确保探测的码率
// Probe size is recommended based on the probe bitrate required. We choose
// a minimum of twice |kMinProbeDeltaMs| interval to allow scheduling to be
// feasible.
DataSize BitrateProber::RecommendedMinProbeSize() const {
if (clusters_.empty()) {
return DataSize::Zero();
}
// 获取clusters队头的探测码率
DataRate send_rate =
DataRate::BitsPerSec(clusters_.front().pace_info.send_bitrate_bps);
return 2 * send_rate * config_.min_probe_delta;
}
2.4.2.3 更新探测已发送码率-ProbeSent()
下一个探测时间(next_probe_time)将会在探测包发送时, 其会调用CalculateNextProbeTime()去更新探测时间
void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
RTC_DCHECK(probing_state_ == ProbingState::kActive);
RTC_DCHECK(!size.IsZero());
if (!clusters_.empty()) {
ProbeCluster* cluster = &clusters_.front();
if (cluster->sent_probes == 0) {
RTC_DCHECK(cluster->started_at.IsInfinite());
cluster->started_at = now;
}
cluster->sent_bytes += size.bytes<int>();
cluster->sent_probes += 1;
// 更新下一个探测时间
next_probe_time_ = CalculateNextProbeTime(*cluster);
if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
RTC_HISTOGRAM_COUNTS_100000("WebRTC.BWE.Probing.ProbeClusterSizeInBytes",
cluster->sent_bytes);
RTC_HISTOGRAM_COUNTS_100("WebRTC.BWE.Probing.ProbesPerCluster",
cluster->sent_probes);
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.Probing.TimePerProbeCluster",
(now - cluster->started_at).ms());
clusters_.pop();
}
if (clusters_.empty()) {
probing_state_ = ProbingState::kSuspended;
}
}
}
CalculateNextProbeTime()的原理很简单: 下一个探测时刻 = cluster 开始探测时刻 + 已经发送数据要流逝的时间
Timestamp BitrateProber::CalculateNextProbeTime(
const ProbeCluster& cluster) const {
RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0);
RTC_CHECK(cluster.started_at.IsFinite());
// Compute the time delta from the cluster start to ensure probe bitrate stays
// close to the target bitrate. Result is in milliseconds.
DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
DataRate send_bitrate =
DataRate::BitsPerSec(cluster.pace_info.send_bitrate_bps);
TimeDelta delta = sent_bytes / send_bitrate;
// 下一个探测时刻 = cluster 开始探测时刻 + 已发送的数据所用的时刻
return cluster.started_at + delta;
}
2.4.3 探测预估器-ProbeEstimator
当探测包发送到接收端后, 接收端构造cc-feedback发送回来, 由于ProbeEstimator去计算实际的探测码率, 在GoogCcNetworkController::OnTransportPacketsFeedback()可以见到
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
...
for (const auto& feedback : report.SortedByReceiveTime()) {
if (feedback.sent_packet.pacing_info.probe_cluster_id !=
PacedPacketInfo::kNotAProbe) {
// probe_estimator 根据返回的feedback更新带宽探测的计算
probe_bitrate_estimator_->HandleProbeAndEstimateBitrate(feedback);
}
}
...
// 获取上面循环更新probe_estimator的最终的结果
absl::optional<datarate> probe_bitrate =
probe_bitrate_estimator_->FetchAndResetLastEstimatedBitrate();
...
}
在for循环中使用report.SortedByReceiveTime()获取到的包都是有接收到的包, 没有收到的包的feedback时间为正无穷,被自动过滤了
std::vector<packetresult> TransportPacketsFeedback::SortedByReceiveTime()
const {
std::vector<packetresult> res;
for (const PacketResult& fb : packet_feedbacks) {
// 只返回接收到的包
if (fb.receive_time.IsFinite()) {
res.push_back(fb);
}
}
std::sort(res.begin(), res.end(), PacketResult::ReceiveTimeOrder());
return res;
}
然后feedback packet会作为 ProbeBitrateEstimator::HandleProbeAndEstimateBitrate()的入参使用
2.4.3.1 计算探测结果-HandleProbeAndEstimateBitrate()
ProbeBitrateEstimator::HandleProbeAndEstimateBitrate() 通过feedback packet 去计算探测码率
absl::optional<datarate> ProbeBitrateEstimator::HandleProbeAndEstimateBitrate(
const PacketResult& packet_feedback) {
// 从feedback packet中获得probe cluster id
int cluster_id = packet_feedback.sent_packet.pacing_info.probe_cluster_id;
RTC_DCHECK_NE(cluster_id, PacedPacketInfo::kNotAProbe);
// 清除之前到期的cluster
EraseOldClusters(packet_feedback.receive_time);
// 获取cluster进行跟新(或通过feedback创建cluster)
AggregatedCluster* cluster = &clusters_[cluster_id];
// 更新send_time
if (packet_feedback.sent_packet.send_time < cluster->first_send) {
cluster->first_send = packet_feedback.sent_packet.send_time;
}
if (packet_feedback.sent_packet.send_time > cluster->last_send) {
cluster->last_send = packet_feedback.sent_packet.send_time;
cluster->size_last_send = packet_feedback.sent_packet.size;
}
// 更新receive_time
if (packet_feedback.receive_time < cluster->first_receive) {
cluster->first_receive = packet_feedback.receive_time;
cluster->size_first_receive = packet_feedback.sent_packet.size;
}
// 对于没有接收到的包,已经在外部(SortedByReceiveTime())做了特殊处理
if (packet_feedback.receive_time > cluster->last_receive) {
cluster->last_receive = packet_feedback.receive_time;
}
// 统计send packet size
cluster->size_total += packet_feedback.sent_packet.size;
cluster->num_probes += 1;
RTC_DCHECK_GT(
packet_feedback.sent_packet.pacing_info.probe_cluster_min_probes, 0);
RTC_DCHECK_GT(packet_feedback.sent_packet.pacing_info.probe_cluster_min_bytes,
0);
// 带宽探测对返回feedback包的数量和大小都有要求
// 最小返回需要的探测包为 probe_cluster_min_probes * 0.8
// 最小返回大小为probe_cluster_min_bytes * 0.8
int min_probes =
packet_feedback.sent_packet.pacing_info.probe_cluster_min_probes *
kMinReceivedProbesRatio;
DataSize min_size =
DataSize::Bytes(
packet_feedback.sent_packet.pacing_info.probe_cluster_min_bytes) *
kMinReceivedBytesRatio;
if (cluster->num_probes < min_probes || cluster->size_total < min_size)
return absl::nullopt;
// 计算发送/接收间隔
// 并对其进行合理性校验,合理才计算带宽探测
TimeDelta send_interval = cluster->last_send - cluster->first_send;
TimeDelta receive_interval = cluster->last_receive - cluster->first_receive;
if (send_interval <= TimeDelta::Zero() || send_interval > kMaxProbeInterval ||
receive_interval <= TimeDelta::Zero() ||receive_interval > kMaxProbeInterval) {
RTC_LOG(LS_INFO) << "Probing unsuccessful, invalid send/receive interval";
}
// send_interval 不包含最后一个包的发送时间,计算码率的时候要去掉最后一个包
RTC_DCHECK_GT(cluster->size_total, cluster->size_last_send);
// 计算发送码率
DataSize send_size = cluster->size_total - cluster->size_last_send;
DataRate send_rate = send_size / send_interval;
// 同上
RTC_DCHECK_GT(cluster->size_total, cluster->size_first_receive);
DataSize receive_size = cluster->size_total - cluster->size_first_receive;
// 计算接受码率
DataRate receive_rate = receive_size / receive_interval;
// receive_rate 远大于 send_rate,一半包的feedback还没加进来,不做预测
double ratio = receive_rate / send_rate;
if (ratio > kMaxValidRatio) {
RTC_LOG(LS_INFO) << "Probing unsuccessful, receive/send ratio too high";
}
RTC_LOG(LS_INFO) << "Probing successful";
// 取发送码率和接受码率中小的那个作为探测的结果
DataRate res = std::min(send_rate, receive_rate);
// 如果接收码率远小于发送码率,这说明达到了链路的真实容量,此时,目标码率会被设置为
// receive_rate降低一点即可
if (receive_rate < kMinRatioForUnsaturatedLink * send_rate) {
RTC_DCHECK_GT(send_rate, receive_rate);
res = kTargetUtilizationFraction * receive_rate;
}
if (event_log_) {
event_log_->Log(
std::make_unique<rtceventproberesultsuccess>(cluster_id, res.bps()));
}
estimated_data_rate_ = res;
return estimated_data_rate_;
}
ProbeBitrateEstimator::HandleProbeAndEstimateBitrate() 中:
- 利用当前接踵而来的feedback packet, 维护一个发送区间 [first_send_time , last_send_time] 和 接收区间 [firset_receive_time , last_receive_time], 同时累计已发送数据大小(send_size)和已接收数据大小(receive_size), 然后一除就能得到发送码率和接收码率
- 计算发送码率和接收码率的过程中,对于interval和包的数量都有要达到的要求,一定程度上保证结果的准确性
- 其中没有接收到的包对应的feedback在前面已经说过了会被过滤掉, 所以这里计算receive_size不用特殊处理,但是计算send_size按道理是需要把那部分包给加上的, 不知道为何没有, 没能理解
- 最终会均衡发送码率和接受码率中最小的会作为探测码率
2.4.3.2 获取探测结果-FetchAndResetLastEstimatedBitrate()
最终码率可以通过robeBitrateEstimator::FetchAndResetLastEstimatedBitrate()得到, 这里有个细节,预估码率被取走后就reset了, 下次进来就拿不到了, 这个函数只会在cc-controller收到cc-feedback时调用, 为的就是计算当前实时的探测码率去辅助后面的码率估计,没有就不要.
absl::optional<datarate>
ProbeBitrateEstimator::FetchAndResetLastEstimatedBitrate() {
absl::optional<datarate> estimated_data_rate = estimated_data_rate_;
estimated_data_rate_.reset();
return estimated_data_rate;
}
2.5 吞吐量-AcknowledgedBitrateEstimator
和Probe相比, AcknowledgedBitrateEstimator 是用来计算当前吞吐量的, 而Probe只会在一些特殊的时候才会进行探测(链路刚开始时, 码率不正常暴跌时), AcknowledgedBitrateEstimator在中会被调用GoogCcNetworkController::OnTransportPacketsFeedback()
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
...
// 根据feedback计算吞吐量
acknowledged_bitrate_estimator_->IncomingPacketFeedbackVector(
report.SortedByReceiveTime());
auto acknowledged_bitrate = acknowledged_bitrate_estimator_->bitrate();
...
}
然后调用AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector()
2.5.1 统计包大小-IncomingPacketFeedbackVector()
IncomingPacketFeedbackVector()主要是会统计出当前这个feedback packet的size, 然后交由BitrateEstimator去更新吞吐量
void AcknowledgedBitrateEstimator::IncomingPacketFeedbackVector(
const std::vector<packetresult>& packet_feedback_vector) {
RTC_DCHECK(std::is_sorted(packet_feedback_vector.begin(),
packet_feedback_vector.end(),
PacketResult::ReceiveTimeOrder()));
for (const auto& packet : packet_feedback_vector) {
if (alr_ended_time_ && packet.sent_packet.send_time > *alr_ended_time_) {
bitrate_estimator_->ExpectFastRateChange();
alr_ended_time_.reset();
}
// 这个acknowledged_estimate的值是当前已发出的packet的size和其前面
// 没有开启feedback track的size的总和
// 每个packet发送的时候会被检查是否enable TransportSequenceNumber
// enable了就会开启cc-feedback,会有对应的sent_packet和size的记录
// 没有enable的但是网络上发送出去的packet的size会做累加,直到下一个enable
// 的packet出现,把累加的size放到prior_unacked_data中
DataSize acknowledged_estimate = packet.sent_packet.size;
acknowledged_estimate += packet.sent_packet.prior_unacked_data;
bitrate_estimator_->Update(packet.receive_time, acknowledged_estimate,
in_alr_);
}
}
这里有一个细节的地方, feedback packet中有一个值叫做prior_unacked_data, 记录的是在这个packet前的包但是没有TransportSequenceNumber, 这种包不会feedback创建, 但是它们的size会累积到下一个最近包的prior_unacked_data上, 详细点见2.5.2
2.5.2 included_in_feedback
packet->included_in_feedback的设置在RtpSenderEgress::SendPacket()中设置的,如果启用了TransportSequenceNumber这个extension则会设置这个包
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
//........
options.is_retransmit = !is_media;
if (auto packet_id = packet->GetExtension<transportsequencenumber>()) {
options.packet_id = *packet_id;
options.included_in_feedback = true; //设置是否in_feed_back
options.included_in_allocation = true;
AddPacketToTransportFeedback(*packet_id, *packet, pacing_info);
}
options.additional_data = packet->additional_data();
if (packet->packet_type() != RtpPacketMediaType::kPadding &&
packet->packet_type() != RtpPacketMediaType::kRetransmission) {
UpdateDelayStatistics(packet->capture_time_ms(), now_ms, packet_ssrc);
UpdateOnSendPacket(options.packet_id, packet->capture_time_ms(),
packet_ssrc);
}
// 转发packet
const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
......
}
这个flag伴随着options会随着的以下的堆栈从SendPacketToNetwork()直到从AsyncUDPSocket::SendTo()发送完成后会构造构造SentPacket,沿着类层次反向将发包情况发回通知上层
rtc::AsyncUDPSocket::SendTo()
cricket::UDPPort::SendTo()
cricket::ProxyConnection::Send()
cricket::P2PTransportChannel::SendPacket()
cricket::DtlsTransport::SendPacket()
webrtc::RtpTransport::SendPacket()
webrtc::SrtpTransport::SendRtpPacket()
cricket::BaseChannel::SendPacket()
bool RtpSenderEgress::SendPacketToNetwork() // 发送
int AsyncUDPSocket::SendTo(const void* pv,
size_t cb,
const SocketAddress& addr,
const rtc::PacketOptions& options) {
// 构造SentPacket
rtc::SentPacket sent_packet(options.packet_id, rtc::TimeMillis(),
options.info_signaled_after_sent);
CopySocketInformationToPacketInfo(cb, *this, true, &sent_packet.info);
int ret = socket_->SendTo(pv, cb, addr);
SignalSentPacket(this, sent_packet);
return ret;
}
将构造的包返回上层,传递至TransportFeedbackAdapter中
absl::optional<sentpacket> TransportFeedbackAdapter::ProcessSentPacket()
void RtpTransportControllerSend::OnSentPacket()
void BaseChannel::SignalSentPacket_n()
webrtc::RtpTransport::OnSentPacket()
cricket::DtlsTransport::OnSentPacket()
cricket::P2PTransportChannel::OnSentPacket()
cricket::UDPPort::OnSentPacket()
rtc::AsyncUDPSocket::SendTo()
调用的RtpTransportControllerSend::OnSentPacket()
如下:
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
task_queue_.PostTask([this, sent_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
absl::optional<sentpacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
if (packet_msg && controller_)
PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
其中transport_feedback_adapter_.ProcessSentPacket()
中会用上included_in_feedback,将untracked_size 累加起来,放到最近一个sent_packet上
absl::optional<sentpacket> TransportFeedbackAdapter::ProcessSentPacket(
const rtc::SentPacket& sent_packet) {
auto send_time = Timestamp::Millis(sent_packet.send_time_ms);
// TODO(srte): Only use one way to indicate that packet feedback is used.
if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
int64_t unwrapped_seq_num =
seq_num_unwrapper_.Unwrap(sent_packet.packet_id);
auto it = history_.find(unwrapped_seq_num);
if (it != history_.end()) {
bool packet_retransmit = it->second.sent.send_time.IsFinite();
it->second.sent.send_time = send_time;
last_send_time_ = std::max(last_send_time_, send_time);
// TODO(srte): Don't do this on retransmit.
if (!pending_untracked_size_.IsZero()) {
// 将untracked_size 赋值给最近一个sent_packet
if (send_time < last_untracked_send_time_)
RTC_LOG(LS_WARNING)
<< "appending acknowledged data for out of order packet. (Diff: "
<< ToString(last_untracked_send_time_ - send_time) << " ms.)";
it->second.sent.prior_unacked_data += pending_untracked_size_;
pending_untracked_size_ = DataSize::Zero();
}
if (!packet_retransmit) {
if (it->second.sent.sequence_number > last_ack_seq_num_)
in_flight_.AddInFlightPacketBytes(it->second);
it->second.sent.data_in_flight = GetOutstandingData();
return it->second.sent;
}
}
} else if (sent_packet.info.included_in_allocation) {
if (send_time < last_send_time_) {
RTC_LOG(LS_WARNING) << "ignoring untracked data for out of order packet.";
}
// 将untracked_size累加起来
pending_untracked_size_ +=
DataSize::Bytes(sent_packet.info.packet_size_bytes);
last_untracked_send_time_ = std::max(last_untracked_send_time_, send_time);
}
return absl::nullopt;
}
2.5.3 更新吞吐量-Update()
Update() 通过feedback packet的size和到达时间来更新吞吐量
void BitrateEstimator::Update(Timestamp at_time, DataSize amount, bool in_alr) {
int rate_window_ms = noninitial_window_ms_.Get();
// We use a larger window at the beginning to get a more stable sample that
// we can use to initialize the estimate.
if (bitrate_estimate_kbps_ < 0.f)
rate_window_ms = initial_window_ms_.Get();
bool is_small_sample = false;
// 计算当前时刻码率
float bitrate_sample_kbps = UpdateWindow(at_time.ms(), amount.bytes(),
rate_window_ms, &is_small_sample);
if (bitrate_sample_kbps < 0.0f)
return;
if (bitrate_estimate_kbps_ < 0.0f) {
// This is the very first sample we get. Use it to initialize the estimate.
bitrate_estimate_kbps_ = bitrate_sample_kbps;
return;
}
// Optionally use higher uncertainty for very small samples to avoid dropping
// estimate and for samples obtained in ALR.
float scale = uncertainty_scale_;
if (is_small_sample && bitrate_sample_kbps < bitrate_estimate_kbps_) {
scale = small_sample_uncertainty_scale_;
} else if (in_alr && bitrate_sample_kbps < bitrate_estimate_kbps_) {
// alr状态下,码率未全部使用,所以实际samples的值的方差应该更大
// Optionally use higher uncertainty for samples obtained during ALR.
scale = uncertainty_scale_in_alr_;
}
// Define the sample uncertainty as a function of how far away it is from the
// current estimate. With low values of uncertainty_symmetry_cap_ we add more
// uncertainty to increases than to decreases. For higher values we approach
// symmetry.
// 此处定义了一个sample_uncertainty,含义上是预估码率和观测码率的偏差
// 偏差越大说明采样点的方差越大,可信度越低
float sample_uncertainty =
scale * std::abs(bitrate_estimate_kbps_ - bitrate_sample_kbps) /
(bitrate_estimate_kbps_ +
std::min(bitrate_sample_kbps,
uncertainty_symmetry_cap_.Get().kbps<float>()));
float sample_var = sample_uncertainty * sample_uncertainty;
// Update a bayesian estimate of the rate, weighting it lower if the sample
// uncertainty is large.
// The bitrate estimate uncertainty is increased with each update to model
// that the bitrate changes over time.
float pred_bitrate_estimate_var = bitrate_estimate_var_ + 5.f;
// 这其实对应的是一个卡尔曼率滤波的后验期望的更新过程
// 后验期望:exp[k]+ = exp[k]ˉ + k*(y[k] - h* exp[k]ˉ)
// 其中 k = var[k]ˉ / (var[k]ˉ + sample_var) (var 和 sample_var 分别为预测误差方差和观测误差方差)
bitrate_estimate_kbps_ = (sample_var * bitrate_estimate_kbps_ +
pred_bitrate_estimate_var * bitrate_sample_kbps) /
(sample_var + pred_bitrate_estimate_var);
bitrate_estimate_kbps_ =
std::max(bitrate_estimate_kbps_, estimate_floor_.Get().kbps<float>());
// 这其实对应的是一个卡尔曼率滤波的后验方差的更新过程,
// 后验方差: var[k] = (1 - k) * var[k]ˉ
// 其中 k = var[k]ˉ / (var[k]ˉ + sample_var) (var 和 sample_var 分别为预测误差方差和观测误差方差)
bitrate_estimate_var_ = sample_var * pred_bitrate_estimate_var /
(sample_var + pred_bitrate_estimate_var);
BWE_TEST_LOGGING_PLOT(1, "acknowledged_bitrate", at_time.ms(),
bitrate_estimate_kbps_ * 1000);
}
BitrateEstimator::Update()中:
-
通过新来feedback packet的大小调用UpdateWindow()去计算当前的码率(bitrate_sample_kbps)
-
将当前计算出来当前码率(bitrate_sample_kbps)作为观测值, 把上一个预测码率(bitrate_estimate_kbps_)当作预测值, 使用贝叶斯滤波去修正当前观测码率(贝叶斯滤波可参考此文), 其中引入了一个基于观测值和预测值的差的变量sample_uncertainty去作为样本标准差.
2.5.4 计算吞吐量-UpdateWindow()
UpdateWindow()计算当前码率的过程如下所示, 原理很简单, 设置了一个时间窗口大小(rate_windows_ms), 将受到feedback的数据放入到最新的rate_windows_ms中, 一除就得到吞吐量了.
float BitrateEstimator::UpdateWindow(int64_t now_ms,
int bytes,
int rate_window_ms,
bool* is_small_sample) {
RTC_DCHECK(is_small_sample != nullptr);
// rate_window_ms(预设评估窗口大小)
// |**********************|------------------------------|
// |-----------------------------------------------------|
// prev_time_ms_ current_window_ms_(当前窗口大小) now_ms
// Reset if time moves backwards
if (now_ms < prev_time_ms_) {
prev_time_ms_ = -1;
sum_ = 0;
current_window_ms_ = 0;
}
if (prev_time_ms_ >= 0) {
// 计算当前窗口大小
current_window_ms_ += now_ms - prev_time_ms_;
// Reset if nothing has been received for more than a full window.
// rate_windows_ms(预设窗口大小)
// |***************************************|
// .......|-----------------------------------------------------|
// prev_time_ms_ now_ms
// |......----------------current_window_ms_--------------------|
// |*********************| 规定窗口大小rate_window_ms
// |---------------------| 被缩减后的窗口
// 超过一个窗口没有接收到东西了,把过去窗口累加的size重置为0, 并把窗口大小
// 进行缩减,减去一个周期前的长度,因为sum_置0后,会+=bytes,所以窗口
// 不是直接置0而是保存在在一个窗口的部分
if (now_ms - prev_time_ms_ > rate_window_ms) {
sum_ = 0; // 重置
current_window_ms_ %= rate_window_ms;
}
}
prev_time_ms_ = now_ms;
float bitrate_sample = -1.0f;
if (current_window_ms_ >= rate_window_ms) {
// 满足一个窗口了,计算当前窗口内的码率
*is_small_sample = sum_ < small_sample_threshold_->bytes(); // 窗口数据小
bitrate_sample = 8.0f * sum_ / static_cast<float>(rate_window_ms);
current_window_ms_ -= rate_window_ms;// 减去窗口
sum_ = 0;
}
sum_ += bytes;
return bitrate_sample;
}
2.6 基于延迟码率预估-DelayBasedBwe
DelayBasedBwe是基于延迟的码率估计, 在进行源码分析前, 对它的背后原理进行简述
2.6.1 cc 拥塞控制原理
GCC拥塞控制的原理有相应的论文:Analysis and Design of the Google Congestion Contro for Web Real-time Communication (WebRTC),虽然该论文在包组间的延迟梯度的计算是卡尔曼滤波(还不是trendline),但没关系,只是一个小点而已,全文对webrtc的拥塞控制的设计,构成,原理都介绍的很全,虽然网络上很多资料也会进行总结介绍,但其实说的并不深入的,只是简单的介绍了纸面公式的含义,如包组间延迟梯度计算,阈值判断,阈值更新,发送码率更新,但对背后的思想没有进行探讨,对网络方向铺垫不够的同学,很容易误导其以为理解了流程就完全理解了整个系统的原理。
2.6.1.1 基于延迟梯度的拥塞控制原理
gcc的基于延迟的拥塞控制是一个由【发送码率】-【排队延迟梯度】-【自适应阈值】三者构成的一个系统,如下所示:
发送码率(send bitrate)影响了网络设备排队队列的长度,从而影响到了排队延迟梯度(delay gradient)的大小,排队延迟梯度的大小和阈值(threshold detector)比较又决定下次的发送码率是上升还是下降,看起来是鸡生蛋蛋生鸡,接下来详细解释一下其中的部分细节。
排队延迟梯度是指两组包之间在网络传播时,由于网络设备的排队队列长度的改变而导致的单位时间下排队延迟变化量, 借旧版的webrtc中的基于卡尔曼滤波延迟拥塞控制算法,如下图所示 (T_{i-1}) 和 (T_{i})分别代表的是连续两帧图像的包组,其在网络传输的延迟差为:
这连续两帧延迟差产生的原因是有(Delta L_{i})(包组间的大小差), (C_{i})(传输速率),(m_{i})(包组排队延迟差),(z_{i})(随机误差)
重点需要介绍的是(m_{i})(排队延迟差),程序发送的每个数据包经过中间网络设备的转发的时候会先被放在排队队列再经过处理发送, 根据网络设备的能力,当设备的处理发送速度不小于接收速度,排队队列就始终很小甚至为空,排队延迟基本为0,当设备的处理发送速度比接收速度小的时候,排队队列就会越来越大,排队延迟就会很高
在超过设备的最大能力时,发送码率增加/减少剧烈,队列长度也会跟着剧烈变化,进而导致排队延迟的剧烈变化,我们使用排队延迟梯度的大小去表示排队延迟的变化程度,所以排队延迟梯度的大小和发送码率的变化是有直接关系的。
要解决网络拥塞的问题,本质是在链路最大负载和发送率之间做均衡,但链路的最大负载能力是不确定的也无法直接探测,而排队队列的虽然情况直接反应了当前链路的负载情况,但由于无法直接测得排队队列长度,所以侧面把排队延迟梯度作为目标指标,始终让它逐渐增长,遇到某个阈值后降下来,然后再增长,就可以保证:
1.充分达到网络的饱和负载,队列大小才开始有正向变化,排队延迟梯度才不再始终为0,这个时候肯定是超过链路最大负载能力。
2. 防止过载,当排队梯度正向扩张,并且大于某个阈值时,已经充分使用了链路的负载,队列也一定已经增长到某个程度了,已经是过负载的情况,这个时候就要降低发送码率,发送码率小于链路最大负载后,排队队列会被逐渐排空,队列没包稳定了,此时延迟梯度也会慢慢的收敛至零,又可以进行下一轮的增长。
在理解该算法时很容易把最终的码率看作目标,排队延迟梯度当作手段,其实不然,该算法是通过不断的码率调节,调节排队延迟梯度达到正向到大于某个阈值,从而达到网络设备的最大负载。
接下来从数学角度上介绍上面的那一坨,简洁直观的看码率如何影响排队延迟梯度(这里给出了梯度的数学定义,这个数学定义非常重要,在后续的思考跑偏的时候可以回来看看排队梯度到底是啥),排队延迟梯度被定义为排队延迟的导数(T_q{'}(t)),如下:
其中(q(t))为队列长度,(q^{’}(t)) 为排队队列增长导数,(C)为链路最大容量,也就是最大处理负载,(r(t))为队列接收速率也就是发送码率。
式(2.2.4)~式(2.2.6)可以不看,只是为了展示(q^{’}(t))含义,揭示它和发送码率r(t)的关系,所以啰嗦的把过程写出来了;
对于式(2.2.3), 直观上理解就是: **排队延迟梯度 = 队列长度的变化 / 最大处理负载 **。
最后得到式(2.2.7),很直观能看到排队延迟梯度链路容量(T_q{'}(t))的大小就是接收码率r(t)和链路最大容量C的大小关系,当接收码率r(t)越来越高,其和链路容量的差值就越大,链路越负载不动,延迟梯度(T_q{'}(t))也就越高;但同时也揭示了一个局限点,假设r(t) > C 后但是不再增长,此时延迟梯度便不会再做改变,但此时排队队列会随着时间不断的增长而导致传输延迟升高最终导致网络拥堵的情况,这明显不是我们想看到的事情,所以接收码率r(t)必须不断的攀升,使得延迟梯度(T_q{'}(t))跟着攀升到一个阈值,这时就可以认为队列已经经过了一段时间增长变得太长了,需要缓一缓,进而降低接收码率,让队列减少,减少到一定后再继续攀爬,周而复始,如下图所示:
2.6.1.2 webrtc中基于延迟的码率预估
- 排队延迟梯度的计算
m55之后,webrtc使用trendline算法去估算排队延迟梯度;其和此前基于卡尔曼滤波的老算法相比,最大的不同是:
1.不再将视频帧作为包组,而是将连续的包间间隔不超过的5ms的包作为一组,包组长不超过100ms.
2.不再用卡尔曼滤波修正对排队延迟梯度估计的误差,而是使用线性回归的方式直接对排队延迟梯队进行预估平衡误差和随机因素.
记每个包组的第一个包的发送时间为(T_{i})和最后一个包的到达时间(t_i),通过公式2.2.1
,可以得到组间传输延迟差
在收集到20个这样的包组时候,以到达时间 - 第一个包的达到时间为x轴,累计延迟为y轴,这些延迟差在一定时刻下对应一定的排队梯度,但因为观测误差,噪声等其它随机因素导致出现偏差,延迟差还需要通过线性回归计算斜率,去平衡其它干扰因素带来的影响,计算出来的斜率便是排队延迟的梯度。
实际的计算具体的过程如下:
每个包组对应的x轴坐标(x_i)定义如下成到达时间-第一个包到达时间,(t_i)为到达时间, (first_arrival)为系统运行后第一个包的到达时间
每个包组对应的y轴坐标(y_i)定义成延迟变化的累计(sum_{k = 0}^{i}{d_k}), 为了平滑使用式(2.2.3.3)指数退避(Exponential backoff filter)做一次平滑
确定好了(x_i, y_i)后使用线性回归的公式计算, 见(2.2.3.4 ~ 2.2.3.6),就可以得到当前的排队延迟梯度(T_{q_i})
- 根据梯度探测当前网络状况
得到排队延迟梯度T_{q_i}之后,接下来就要对该梯度进行检测,梯度检测的过程和2.2.1所描述的差不多,也是看梯度是否在一个预设的经验阈值 $[-gamma, gamma] (内,但梯度的计算是由当前n(20)个点进行的,由于点的间隔相对稳定(100ms)内,**所以webrtc中将梯度*点数,得到一个基于点数为基础的梯度值,然后和一个基于点数的经验阈值进行比较(后续分析源码时详细解释)**,然后将此值和阈值做对比,大于)gamma(表示带宽过度使用(overuse),处于)[-gamma, gamma](表示正常(normal),小于)-gamma$表示带宽未充分使用(underuse)
阈值(gamma)在比较完成后会结合当前的时延梯度进行指数平滑,指数平滑是时间序列分析中常用的一种数学手段,平滑的指数中还增加了(Delta ti),使得阈值的调整更加及时, (Delta t_{i} = t_{i} - t_{i-1}) ,代表着当前阈值更新时间和上一次阈值更新时间的间隔
其中(k_{i})并不是一个固定值,当梯度绝对值大于阈值的时候,(k_i=0.0087)比较小,超过阈值是的梯度(|NT_{q_{i}}|)的瞬时增跌是非常剧烈的(因为从是0队列长开始变化的),所以此时更倾向于历史值;而当梯度绝对值小于阈值的时候, (k_{i})比较大,让阈值向当前梯度迅速收敛,当前梯度更有参考性。
在这里需要提及一个点,为什么要用延时梯度去更新阈值?指数平滑一般是使用同一个变量在不同时刻的值做加权,而这里延时梯度和阈值并不是同一个变量,这样去更新阈值是否会破坏阈值的有效性?这就需要回到我们的根本目标上来,我们需要的是什么?需要的是刚好充分的使用到链路的最大负载能力,这个时候排队队列是刚好开始积累包或者排队队列的包即将排空,如果能够排除掉网络公用性以及一些其它随机性的干扰因素,梯度变为正向时就说明已经到达链路最大负载能力了,而不再需要设置一个阈值去缓冲,可这是不可能的,所以才需要设置这么一个阈值而不是直接和0比较,但观测到的梯度其实往往对应的是排队队列开始积累包/即将排空包时的梯度, 观测梯度更具有参考性,表示当前队列的长度在0的边缘,所以我们需要对预设的阈值向观测梯度收敛: 在(|NT_{q_{i}}| < gamma_{i-1})时,(|NT_{q_{i}}|)缓慢变化,代表着排队队列刚开始积累/排空包的梯度,此时应当让阈值快速朝着梯度收敛;在(|NT_{q_{i}}| > gamma_{i-1}),此时的梯度代表刚好超过了链路能承载的极限一点的梯度,阈值应该朝着向这个方向增长。以下图的5个点作为例子:
在点1处,随着发送码率的慢慢攀未超过链路容量,队列处于积累包的边缘,阈值在这一段朝着的梯度收敛。
在点2处,发送码率超过链路容量,队列已经明确开始积累包不再为0,梯度有了明显变化瞬时垂直上升,阈值朝着这个队列长度刚起来的时刻下的梯度逼近。
在点3处,梯度超过了阈值后,发送码率被调整跳崖式下降,梯度跳崖式下降,队列处于渐渐排空包的状态。
在点4处,梯度被检测到小于阈值,发送码率重新升高,梯度慢慢回升。
在点5处,重新重复点2处的逻辑,发送码率超过链路容量,梯度开始垂直上升。
- 根据当前网络状况调整发送码率
根据梯度和阈值比较得到当前的网络状况(overuse, normal , underuse),就可以调整码率了,调整的过程中webrtc使用了一个状态机,如下图所示:
在Decrease,Hold,Increase对应的发送码率调整如下, 其中(A_{r}(t_i))表示(t_i)时刻预估码率,(R_r(t_i))表示(t_i)时刻实际码率, 采用了AIMD(慢升速降)的控制策略
初始的码率上升下降路径是:Normal(Increase) ->Overuse(Increase)->Overuse(Decrease)->UnderUse(Hold)->Normal(Hold)->Normal(Increase)
其中特别关注标红的UnderUse(Hold),为什么在处于UnderUse情况下还继续采用Hold策略?此时继续排包就可以使得排队队列被排空,梯度值慢慢回归到0;
而标黄的UnderUse(Hold)和Overuse(Decrease)至今没想明白的什么情况会产生??不太清楚是否为一些上个状态评估的异常而导致的。
至此,基于延迟的带宽探测原理介绍完毕。
2.6.1.3 webrtc中基于丢包的码率预估
2.6.2 IncomingPacketFeedbackVector()
在计算得到此前介绍探测码率(probe_bitrate)和吞吐量(acknowledge_bitrate)后, DelayBasedBwe就可以根据feedback packet去做码率预估了, DelayBasedBwe 在GoogCcNetworkController::OnTransportPacketsFeedback()中被调用IncomingPacketFeedbackVector(), 由此函数为起点,会根据feedback预估出当前网络的状态(normal, underuse, overuse), 然后根据网络状态对码率进行aimd
NetworkControlUpdate GoogCcNetworkController::OnTransportPacketsFeedback(
TransportPacketsFeedback report) {
....
DelayBasedBwe::Result result;
result = delay_based_bwe_->IncomingPacketFeedbackVector(
report, acknowledged_bitrate, probe_bitrate, estimate_,
alr_start_time.has_value());
....
}
IncomingPacketFeedbackVector()中会对每一个feedback packet做IncomingPacketFeedback()
DelayBasedBwe::Result DelayBasedBwe::IncomingPacketFeedbackVector(
const TransportPacketsFeedback& msg,
absl::optional<datarate> acked_bitrate,
absl::optional<datarate> probe_bitrate,
absl::optional<networkstateestimate> network_estimate,
bool in_alr) {
RTC_DCHECK_RUNS_SERIALIZED(&network_race_);
auto packet_feedback_vector = msg.SortedByReceiveTime();
// TODO(holmer): An empty feedback vector here likely means that
// all acks were too late and that the send time history had
// timed out. We should reduce the rate when this occurs.
if (packet_feedback_vector.empty()) {
RTC_LOG(LS_WARNING) << "Very late feedback received.";
return DelayBasedBwe::Result();
}
if (!uma_recorded_) {
RTC_HISTOGRAM_ENUMERATION(kBweTypeHistogram,
BweNames::kSendSideTransportSeqNum,
BweNames::kBweNamesMax);
uma_recorded_ = true;
}
bool delayed_feedback = true;
bool recovered_from_overuse = false;
BandwidthUsage prev_detector_state = active_delay_detector_->State();
for (const auto& packet_feedback : packet_feedback_vector) {
delayed_feedback = false;
// 每个包做trendline
IncomingPacketFeedback(packet_feedback, msg.feedback_time);
if (prev_detector_state == BandwidthUsage::kBwUnderusing &&
active_delay_detector_->State() == BandwidthUsage::kBwNormal) {
recovered_from_overuse = true;
}
prev_detector_state = active_delay_detector_->State();
}
if (delayed_feedback) {
// TODO(bugs.webrtc.org/10125): Design a better mechanism to safe-guard
// against building very large network queues.
return Result();
}
rate_control_.SetInApplicationLimitedRegion(in_alr);
rate_control_.SetNetworkStateEstimate(network_estimate);
// update
return MaybeUpdateEstimate(acked_bitrate, probe_bitrate,
std::move(network_estimate),
recovered_from_overuse, in_alr, msg.feedback_time);
}
IncomingPacketFeedback()的整个过程如下, 虽然很长, 但是很好懂
void DelayBasedBwe::IncomingPacketFeedback(const PacketResult& packet_feedback,
Timestamp at_time) {
// Reset if the stream has timed out.
if (last_seen_packet_.IsInfinite() ||
at_time - last_seen_packet_ > kStreamTimeOut) {
// 检测当前包是否和之前的包相隔太远timeout,是的话对delay_detector做reset
// reset arrival delta 计算器
// 注意此处有两种arrival delta计算器:InterArrivalDelta和 InterArrival
// 如果启用WebRTC-Bwe-NewInterArrivalDelta特性,使用InterArrivalDelta
// 否则使用 InterArrival
if (use_new_inter_arrival_delta_) {
video_inter_arrival_delta_ =
std::make_unique<interarrivaldelta>(kSendTimeGroupLength);
audio_inter_arrival_delta_ =
std::make_unique<interarrivaldelta>(kSendTimeGroupLength);
} else {
video_inter_arrival_ = std::make_unique<interarrival>(
kTimestampGroupTicks, kTimestampToMs, true);
audio_inter_arrival_ = std::make_unique<interarrival>(
kTimestampGroupTicks, kTimestampToMs, true);
}
// reset delay detector
video_delay_detector_.reset(
new TrendlineEstimator(key_value_config_, network_state_predictor_));
audio_delay_detector_.reset(
new TrendlineEstimator(key_value_config_, network_state_predictor_));
active_delay_detector_ = video_delay_detector_.get();
}
last_seen_packet_ = at_time;
// As an alternative to ignoring small packets, we can separate audio and
// video packets for overuse detection.
DelayIncreaseDetectorInterface* delay_detector_for_packet =
video_delay_detector_.get();
if (separate_audio_.enabled) {
// 如果将音频和视频的分开码率预估,根据包是audio/video选择对应的delay_dector
if (packet_feedback.sent_packet.audio) {
delay_detector_for_packet = audio_delay_detector_.get();
audio_packets_since_last_video_++;
if (audio_packets_since_last_video_ > separate_audio_.packet_threshold &&
packet_feedback.receive_time - last_video_packet_recv_time_ >
separate_audio_.time_threshold) {
active_delay_detector_ = audio_delay_detector_.get();
}
} else {
audio_packets_since_last_video_ = 0;
last_video_packet_recv_time_ =
std::max(last_video_packet_recv_time_, packet_feedback.receive_time);
active_delay_detector_ = video_delay_detector_.get();
}
}
DataSize packet_size = packet_feedback.sent_packet.size;
if (use_new_inter_arrival_delta_) {
TimeDelta send_delta = TimeDelta::Zero();
TimeDelta recv_delta = TimeDelta::Zero();
int size_delta = 0;
// 获取audio/video对应的InterArrivalDelta
InterArrivalDelta* inter_arrival_for_packet =
(separate_audio_.enabled && packet_feedback.sent_packet.audio)
? video_inter_arrival_delta_.get()
: audio_inter_arrival_delta_.get();
// 计算前后两组包的delta,
bool calculated_deltas = inter_arrival_for_packet->ComputeDeltas(
packet_feedback.sent_packet.send_time, packet_feedback.receive_time,
at_time, packet_size.bytes(), &send_delta, &recv_delta, &size_delta);
// trendline update and estimate
delay_detector_for_packet->Update(
recv_delta.ms(), send_delta.ms(),
packet_feedback.sent_packet.send_time.ms(),
packet_feedback.receive_time.ms(), packet_size.bytes(),
calculated_deltas);
} else {
// 获取audio/video对应的InterArrivalDelta
InterArrival* inter_arrival_for_packet =
(separate_audio_.enabled && packet_feedback.sent_packet.audio)
? video_inter_arrival_.get()
: audio_inter_arrival_.get();
uint32_t send_time_24bits =
static_cast<uint32_t>(
((static_cast<uint64_t>(packet_feedback.sent_packet.send_time.ms())
<< kAbsSendTimeFraction) +
500) /
1000) &
0x00FFFFFF;
// Shift up send time to use the full 32 bits that inter_arrival works with,
// so wrapping works properly.
uint32_t timestamp = send_time_24bits << kAbsSendTimeInterArrivalUpshift;
uint32_t timestamp_delta = 0;
int64_t recv_delta_ms = 0;
int size_delta = 0;
// 计算前后两组包 的delta
bool calculated_deltas = inter_arrival_for_packet->ComputeDeltas(
timestamp, packet_feedback.receive_time.ms(), at_time.ms(),
packet_size.bytes(), ×tamp_delta, &recv_delta_ms, &size_delta);
double send_delta_ms =
(1000.0 * timestamp_delta) / (1 << kInterArrivalShift);
delay_detector_for_packet->Update(
recv_delta_ms, send_delta_ms,
packet_feedback.sent_packet.send_time.ms(),
packet_feedback.receive_time.ms(), packet_size.bytes(),
calculated_deltas);
}
}
DelayBasedBwe::IncomingPacketFeedback()中:
-
使用一种名为inter_arrival的对象计算两组包之间delta, 然后把这种delta放入一个名为delay_detector做trendline
-
函数开头检查当前包和上一个包在feedback的时间上是否相隔太远, 如果是则重置delay_detector
-
如果音频和视频是分开做码率预估,选择对应的delay_detector,.
-
使用inter_arrival的ComputeDeltas()计算前后两个组包的发送时刻差(send_delta), 接收时刻差(recv_delta)和大小差(size_delta)
-
使用delay_detector的Update()用delta计算当前网络的拥堵状态
2.6.2.1 计算包组延迟差-ComputeDeltas()
ComputeDeltas()的过程很简单, 就是会为当前的包创建group, 然后记录好该组包的第一个发送时间(first_send), 大小(size), 最后一个包的接收时间(complete_time); 当发现当前packet和上一个组包在时间上不在同一个组时,就会创建新的组, 同时让上一个组和上上个组计算各种Delta
bool InterArrivalDelta::ComputeDeltas(Timestamp send_time,
Timestamp arrival_time,
Timestamp system_time,
size_t packet_size,
TimeDelta* send_time_delta,
TimeDelta* arrival_time_delta,
int* packet_size_delta) {
bool calculated_deltas = false;
if (current_timestamp_group_.IsFirstPacket()) {
// We don't have enough data to update the filter, so we store it until we
// have two frames of data to process.
current_timestamp_group_.send_time = send_time;
current_timestamp_group_.first_send_time = send_time;
current_timestamp_group_.first_arrival = arrival_time;
} else if (current_timestamp_group_.first_send_time > send_time) {
// Reordered packet.
return false;
} else if (NewTimestampGroup(arrival_time, send_time)) {
// 通过time判断是否要新建group,如果要则使用current_group - prev_group得到delta
// First packet of a later send burst, the previous packets sample is ready.
if (prev_timestamp_group_.complete_time.IsFinite()) {
*send_time_delta =
current_timestamp_group_.send_time - prev_timestamp_group_.send_time;
*arrival_time_delta = current_timestamp_group_.complete_time -
prev_timestamp_group_.complete_time;
TimeDelta system_time_delta = current_timestamp_group_.last_system_time -
prev_timestamp_group_.last_system_time;
if (*arrival_time_delta - system_time_delta >=
kArrivalTimeOffsetThreshold) {
RTC_LOG(LS_WARNING)
<< "The arrival time clock offset has changed (diff = "
<< arrival_time_delta->ms() - system_time_delta.ms()
<< " ms), resetting.";
Reset();
return false;
}
if (*arrival_time_delta < TimeDelta::Zero()) {
// The group of packets has been reordered since receiving its local
// arrival timestamp.
++num_consecutive_reordered_packets_;
if (num_consecutive_reordered_packets_ >= kReorderedResetThreshold) {
RTC_LOG(LS_WARNING)
<< "Packets between send burst arrived out of order, resetting."
<< " arrival_time_delta" << arrival_time_delta->ms()
<< " send time delta " << send_time_delta->ms();
Reset();
}
return false;
} else {
num_consecutive_reordered_packets_ = 0;
}
*packet_size_delta = static_cast<int>(current_timestamp_group_.size) -
static_cast<int>(prev_timestamp_group_.size);
calculated_deltas = true;
}
// new and swap group
prev_timestamp_group_ = current_timestamp_group_;
// The new timestamp is now the current frame.
current_timestamp_group_.first_send_time = send_time;
current_timestamp_group_.send_time = send_time;
current_timestamp_group_.first_arrival = arrival_time;
current_timestamp_group_.size = 0;
} else {
current_timestamp_group_.send_time =
std::max(current_timestamp_group_.send_time, send_time);
}
// Accumulate the frame size.
current_timestamp_group_.size += packet_size;
current_timestamp_group_.complete_time = arrival_time;
current_timestamp_group_.last_system_time = system_time;
return calculated_deltas;
}
NewTimestampGroup()判断需要创建新组的过程如下所示:
// Assumes that |timestamp| is not reordered compared to
// |current_timestamp_group_|.
bool InterArrivalDelta::NewTimestampGroup(Timestamp arrival_time,
Timestamp send_time) const {
if (current_timestamp_group_.IsFirstPacket()) {
return false;
} else if (BelongsToBurst(arrival_time, send_time)) {
return false;
} else {
// 当前包和组首包的发送时刻差不在一个范围内
return send_time - current_timestamp_group_.first_send_time >
send_time_group_length_;
}
}
/**
* @description: 检查包的是否同属于一组
* @param {*}
* @return {*}
*/
bool InterArrivalDelta::BelongsToBurst(Timestamp arrival_time,
Timestamp send_time) const {
RTC_DCHECK(current_timestamp_group_.complete_time.IsFinite());
// 计算和当前组最后包的到达服务端时刻receive time的差
TimeDelta arrival_time_delta =
arrival_time - current_timestamp_group_.complete_time;
// 计算和当前组最后包的发送时刻(send time)的差
TimeDelta send_time_delta = send_time - current_timestamp_group_.send_time;
if (send_time_delta.IsZero())
return true;
// 计算传播时间差
TimeDelta propagation_delta = arrival_time_delta - send_time_delta;
if (propagation_delta < TimeDelta::Zero() &&
arrival_time_delta <= kBurstDeltaThreshold &&
arrival_time - current_timestamp_group_.first_arrival < kMaxBurstDuration)
// 如果传播时间差为负(这个点不太理解)
// 并且包和组最后的包到达时间差小于kBurstDeltaThreshold(5ms),
// 并与组首包到达时刻差了100ms内,认为是同一组的包
return true;
return false;
}
可以看到包同属于一个组就是要求包和包之间的到达时刻在5ms以内,并且以100ms为一个Group.
2.6.2.2 线性回归计算梯度-trendline::update()
TrendlineEstimator::Update()中使用UpdateTrendline()开始做网络预估,但只有在得到组包间的delta之后(也就是有新的group生成了,算出上一个group和上上个group的delta)才会开始做trendline, 这里的arrival_time_ms是改组group下最后一个包的receive_time(包到达接受端时刻)
void TrendlineEstimator::Update(double recv_delta_ms,
double send_delta_ms,
int64_t send_time_ms,
int64_t arrival_time_ms,
size_t packet_size,
bool calculated_deltas) {
if (calculated_deltas) {
// 使用trendline
UpdateTrendline(recv_delta_ms, send_delta_ms, send_time_ms, arrival_time_ms,
packet_size);
}
if (network_state_predictor_) {
// 未启用
hypothesis_predicted_ = network_state_predictor_->Update(
send_time_ms, arrival_time_ms, hypothesis_);
}
}
UpdateTrendline()的整个细节如下:
void TrendlineEstimator::UpdateTrendline(double recv_delta_ms,
double send_delta_ms,
int64_t send_time_ms,
int64_t arrival_time_ms,
size_t packet_size) {
// 计算排队延迟
const double delta_ms = recv_delta_ms - send_delta_ms;
++num_of_deltas_;
num_of_deltas_ = std::min(num_of_deltas_, kDeltaCounterMax);
if (first_arrival_time_ms_ == -1)
first_arrival_time_ms_ = arrival_time_ms;
// Exponential backoff filter.
accumulated_delay_ += delta_ms;
BWE_TEST_LOGGING_PLOT(1, "accumulated_delay_ms", arrival_time_ms,
accumulated_delay_);
// 指数滤波器平滑累积网络delay
smoothed_delay_ = smoothing_coef_ * smoothed_delay_ +
(1 - smoothing_coef_) * accumulated_delay_;
BWE_TEST_LOGGING_PLOT(1, "smoothed_delay_ms", arrival_time_ms,
smoothed_delay_);
// Maintain packet window
// 以(到达时间,累积延迟)作为点, 将点放入
delay_hist_.emplace_back(
static_cast<double>(arrival_time_ms - first_arrival_time_ms_),
smoothed_delay_, accumulated_delay_);
if (settings_.enable_sort) {
for (size_t i = delay_hist_.size() - 1;
i > 0 &&
delay_hist_[i].arrival_time_ms < delay_hist_[i - 1].arrival_time_ms;
--i) {
std::swap(delay_hist_[i], delay_hist_[i - 1]);
}
}
// 点的个数维持在20以内
if (delay_hist_.size() > settings_.window_size)
delay_hist_.pop_front();
// Simple linear regression.
double trend = prev_trend_;
if (delay_hist_.size() == settings_.window_size) {
// 点的个数需达到20
// Update trend_ if it is possible to fit a line to the data. The delay
// trend can be seen as an estimate of (send_rate - capacity)/capacity.
// 0 < trend < 1 -> the delay increases, queues are filling up
// trend == 0 -> the delay does not change
// trend < 0 -> the delay decreases, queues are being emptied
// 开始线性回归计算斜率
trend = LinearFitSlope(delay_hist_).value_or(trend);
if (settings_.enable_cap) {
// 这个特性不清楚是用来干什么的,其相当于把所有点分成了前后两个部分
// 从前后两部分中各选一个最低点,用来计算一个封顶斜率(cap),但却不知道这样做的含义
// 由于梯度阈值是向梯度是自适应的,防止梯度过高也可以防止梯度阈值过高,难道是为了这个?
absl::optional<double> cap = ComputeSlopeCap(delay_hist_, settings_);
// We only use the cap to filter out overuse detections, not
// to detect additional underuses.
if (trend >= 0 && cap.has_value() && trend > cap.value()) {
trend = cap.value();
}
}
}
BWE_TEST_LOGGING_PLOT(1, "trendline_slope", arrival_time_ms, trend);
// 将排队延迟梯度和阈值相比较,判断当前网络情况
Detect(trend, send_delta_ms, arrival_time_ms);
}
TrendlineEstimator::UpdateTrendline()中:
-
主要是通过包组间排队延迟做trendline得到排队延迟梯度
-
函数首先通过
式2.2.1
计算出排队延迟差(delta_ms), 计算完成后,通过指数退避平滑得到当前点的延迟smoothed_delay -
以当前包组的最后一个包到达接收端时刻为x轴,以该包组的排队延迟梯度为y轴生成点, 然后放入delay_hist, delay_hist会将点的个数维持在20个
-
当点个数达到20个的时候,调用 LinearFitSlope()进行线性回归算出这些点的斜率,也就是排队延迟梯度
absl::optional<double> LinearFitSlope( const std::deque<trendlineestimator::packettiming>& packets) { RTC_DCHECK(packets.size() >= 2); // Compute the "center of mass". double sum_x = 0; double sum_y = 0; for (const auto& packet : packets) { sum_x += packet.arrival_time_ms; sum_y += packet.smoothed_delay_ms; } double x_avg = sum_x / packets.size(); double y_avg = sum_y / packets.size(); // Compute the slope k = sum (x_i-x_avg)(y_i-y_avg) / sum (x_i-x_avg)^2 double numerator = 0; double denominator = 0; for (const auto& packet : packets) { double x = packet.arrival_time_ms; double y = packet.smoothed_delay_ms; numerator += (x - x_avg) * (y - y_avg); denominator += (x - x_avg) * (x - x_avg); } if (denominator == 0) return absl::nullopt; return numerator / denominator; }
-
此处还有一个enable_cap特性,这个特性还没有启用,不太清楚是做什么的,其将点分成前后两个部分(07,819), 通过ComputeSlopeCap()从前后两个部分取出最低的点计算了一个斜率,以此作为排队延迟梯度上限,盲猜是为了限制排队延迟梯度阈值过高,提升overuse判断及时性,因为梯度阈值会朝着排队延迟梯度做自适应。
absl::optional<double> ComputeSlopeCap( const std::deque<trendlineestimator::packettiming>& packets, const TrendlineEstimatorSettings& settings) { RTC_DCHECK(1 <= settings.beginning_packets && settings.beginning_packets < packets.size()); RTC_DCHECK(1 <= settings.end_packets && settings.end_packets < packets.size()); RTC_DCHECK(settings.beginning_packets + settings.end_packets <= packets.size()); // 7之前的最低点 TrendlineEstimator::PacketTiming early = packets[0]; for (size_t i = 1; i < settings.beginning_packets; ++i) { if (packets[i].raw_delay_ms < early.raw_delay_ms) early = packets[i]; } // 7以后的最低点 size_t late_start = packets.size() - settings.end_packets; TrendlineEstimator::PacketTiming late = packets[late_start]; for (size_t i = late_start + 1; i < packets.size(); ++i) { if (packets[i].raw_delay_ms < late.raw_delay_ms) late = packets[i]; } if (late.arrival_time_ms - early.arrival_time_ms < 1) { return absl::nullopt; } // 计算最低点和最高点的斜率 return (late.raw_delay_ms - early.raw_delay_ms) / (late.arrival_time_ms - early.arrival_time_ms) + settings.cap_uncertainty; }
-
最终使用Detect()函数,对当前的排队延迟梯度和阈值相比较,判断当前的网络拥塞状况(overuse, normal, underuse)
2.6.2.3 梯度比较-trendline::Detect()
void TrendlineEstimator::Detect(double trend, double ts_delta, int64_t now_ms) {
if (num_of_deltas_ < 2) {
hypothesis_ = BandwidthUsage::kBwNormal;
return;
}
// 此处没有直接使用trend,而是计算一个kMinNumDeltas(60)个点为单位的延迟梯度缩小误差
// 下面会和一个阈值(threshold_)做比较,该阈值应该也是基于60个点的情况下算出来的阈值
const double modified_trend =
std::min(num_of_deltas_, kMinNumDeltas) * trend * threshold_gain_;
prev_modified_trend_ = modified_trend;
BWE_TEST_LOGGING_PLOT(1, "T", now_ms, modified_trend);
BWE_TEST_LOGGING_PLOT(1, "threshold", now_ms, threshold_);
if (modified_trend > threshold_) {
if (time_over_using_ == -1) {
// Initialize the timer. Assume that we've been
// over-using half of the time since the previous
// sample.
time_over_using_ = ts_delta / 2;
} else {
// Increment timer
time_over_using_ += ts_delta;
}
overuse_counter_++;
if (time_over_using_ > overusing_time_threshold_ && overuse_counter_ > 1) {
// 带宽过量使用时间超过阈值,reset, 并将hypothesis_设置为overUsing
if (trend >= prev_trend_) {
time_over_using_ = 0;
overuse_counter_ = 0;
hypothesis_ = BandwidthUsage::kBwOverusing;
}
}
} else if (modified_trend < -threshold_) {
// 小于阈值下限 underusing
time_over_using_ = -1;
overuse_counter_ = 0;
hypothesis_ = BandwidthUsage::kBwUnderusing;
} else {
// 处于阈值间, normal
time_over_using_ = -1;
overuse_counter_ = 0;
hypothesis_ = BandwidthUsage::kBwNormal;
}
prev_trend_ = trend;
// 更新排队梯度阈值
UpdateThreshold(modified_trend, now_ms);
}
TrendlineEstimator::Detect()中:
- 通过排队延迟梯度和梯度阈值做比较判断当前的网络情况
- 对排队延迟梯度的单位做了更改,改成了以60个点为单位排队延迟梯度, 并由于字面值太小所以乘上了一个增益(threshold_gain_), 最终得到modified_trend, 下面比较的threshold应该是基于此做实验测得的值
- 调用UpdateThreshold()更新梯度阈值
void TrendlineEstimator::UpdateThreshold(double modified_trend,
int64_t now_ms) {
if (last_update_ms_ == -1)
last_update_ms_ = now_ms;
if (fabs(modified_trend) > threshold_ + kMaxAdaptOffsetMs) {
// Avoid adapting the threshold to big latency spikes, caused e.g.,
// by a sudden capacity drop.
last_update_ms_ = now_ms;
return;
}
// 排队梯度阈值向当前排队梯度趋近
// modified_trend < threshold_? k =0.039 : k = 0.0087
const double k = fabs(modified_trend) < threshold_ ? k_down_ : k_up_;
// 添加时间考虑及时性
const int64_t kMaxTimeDeltaMs = 100;
int64_t time_delta_ms = std::min(now_ms - last_update_ms_, kMaxTimeDeltaMs);
// 指数平滑自适应
threshold_ += k * (fabs(modified_trend) - threshold_) * time_delta_ms;
threshold_ = rtc::SafeClamp(threshold_, 6.f, 600.f);
last_update_ms_ = now_ms;
}
至此就完成了从包组计算排队延迟差-> 排队延迟差进行trendline得到排队延迟梯度-> 排队延迟梯度和排队延迟阈值比较得到当前网络状况(normal、underuse、ovreuse) -> 梯度阈值自适应, 接下来就是进行aimd的码率调整, 让我们回到DelayBasedBwe::Result DelayBasedBwe::IncomingPacketFeedbackVector()
中的最后,其调用了MaybeUpdateEstimate()
开始做码率的aimd
2.6.2.4 码率调整-MaybeUpdateEstimate()
码率调整的过程如下
DelayBasedBwe::Result DelayBasedBwe::MaybeUpdateEstimate(
absl::optional<datarate> acked_bitrate,
absl::optional<datarate> probe_bitrate,
absl::optional<networkstateestimate> state_estimate,
bool recovered_from_overuse,
bool in_alr,
Timestamp at_time) {
Result result;
// Currently overusing the bandwidth.
if (active_delay_detector_->State() == BandwidthUsage::kBwOverusing) {
//bw_state: overusing
//
if (has_once_detected_overuse_ && in_alr && alr_limited_backoff_enabled_) {
// ALR下overuse将码率过程比较特殊,这里是为了解决一个issue:10144
// 大意也就是说因ALR无法发送足够的码率使得探测带宽上升,完全是依赖周期性的alr probe
// 当出现overuse假峰值时,不能像正常一样使用ack_bitrate做AIMD,否则会使得码率远低于estimate_bitrate
// ack rate此时不太可信,所以使用estimate_bitrate去预估然后迅速发一个probe去判断
// 预估的码率是否准确
if (rate_control_.TimeToReduceFurther(at_time, prev_bitrate_)) {
// 到了调整码率的时候了
result.updated =
UpdateEstimate(at_time, prev_bitrate_, &result.target_bitrate);
result.backoff_in_alr = true;
}
} else if (acked_bitrate &&
rate_control_.TimeToReduceFurther(at_time, *acked_bitrate)) {
// 正常情况下,有ack_bitrate,直接使用它做decrease
result.updated =
UpdateEstimate(at_time, acked_bitrate, &result.target_bitrate);
} else if (!acked_bitrate && rate_control_.ValidEstimate() &&
rate_control_.InitialTimeToReduceFurther(at_time)) {
//ack_bitrate没有测出来,单纯的用时间去判断能否再次降低码率
// Overusing before we have a measured acknowledged bitrate. Reduce send
// rate by 50% every 200 ms.
// TODO(tschumim): Improve this and/or the acknowledged bitrate estimator
// so that we (almost) always have a bitrate estimate.
// 直接将预估码率腰斩至50%
rate_control_.SetEstimate(rate_control_.LatestEstimate() / 2, at_time);
result.updated = true;
result.probe = false;
result.target_bitrate = rate_control_.LatestEstimate();
}
has_once_detected_overuse_ = true;
} else {
//bw_state: normal 和 under using
if (probe_bitrate) {
// 如果有探测码率,无须慢增长,直接使用探测码率,并且把探测码率的数据更新到rate_control中
result.probe = true;
result.updated = true;
result.target_bitrate = *probe_bitrate;
rate_control_.SetEstimate(*probe_bitrate, at_time);
} else {
// 没有的话,就只能进行rate调整了
result.updated =
UpdateEstimate(at_time, acked_bitrate, &result.target_bitrate);
result.recovered_from_overuse = recovered_from_overuse;
}
}
BandwidthUsage detector_state = active_delay_detector_->State();
if ((result.updated && prev_bitrate_ != result.target_bitrate) ||
detector_state != prev_state_) {
DataRate bitrate = result.updated ? result.target_bitrate : prev_bitrate_;
BWE_TEST_LOGGING_PLOT(1, "target_bitrate_bps", at_time.ms(), bitrate.bps());
if (event_log_) {
event_log_->Log(std::make_unique<rtceventbweupdatedelaybased>(
bitrate.bps(), detector_state));
}
// 记录之前的预估码率和state
prev_bitrate_ = bitrate;
prev_state_ = detector_state;
}
return result;
}
DelayBasedBwe::MaybeUpdateEstimate()中:
- 获取当前的网络状态,如果是overuse并且当前处于alr下,则认为ALR无法发送足够的码率使得探测带宽上升,当出现overuse假峰值时,不能像正常一样使用ack_bitrate做AIMD,否则会使得码率远低于estimate_bitrate,ack rate此时不太可信,所以使用estimate_bitrate去预估然后迅速发一个probe去判断预估的码率是否准确;
- 如果是正常情况下的overuse, 如果有吞吐量(acknowledge),直接基于当前的吞吐量(acknowledge)做码率减少;如果没有吞吐量,说明处于最开始的探测阶段,rate_control中只有一个初始预设的码率,这个码率直接缩小50%作为调整的码率
- 如果是normal或者underuse的情况下,有probe_bitrate就用probe_bitrate做码率恢复,应该是认为瞬时性的它更准确,否则的话就用当前吞吐量
- 将预测完的码率bitrate中进行更新,将结果返回
码率更新的结果如下所示:
bool DelayBasedBwe::UpdateEstimate(Timestamp at_time,
absl::optional<datarate> acked_bitrate,
DataRate* target_rate) {
const RateControlInput input(active_delay_detector_->State(), acked_bitrate);
*target_rate = rate_control_.Update(&input, at_time);
return rate_control_.ValidEstimate();
}
Update()中会检查aimd 是否设置了初始码率,然后调用ChangeBitrate()进行码率调整
DataRate AimdRateControl::Update(const RateControlInput* input,
Timestamp at_time) {
RTC_CHECK(input);
// Set the initial bit rate value to what we're receiving the first half
// second.
// TODO(bugs.webrtc.org/9379): The comment above doesn't match to the code.
// aimd的初始码率未设置
if (!bitrate_is_initialized_) {
const TimeDelta kInitializationTime = TimeDelta::Seconds(5);
RTC_DCHECK_LE(kBitrateWindowMs, kInitializationTime.ms());
if (time_first_throughput_estimate_.IsInfinite()) {
// 记录第一个发送码率到达的时间
if (input->estimated_throughput)
time_first_throughput_estimate_ = at_time;
} else if (at_time - time_first_throughput_estimate_ >
kInitializationTime &&
input->estimated_throughput) {
// 第N个发送码率时间和第一个相差大于kInitializationTime(5s),
// 超过了初始化码率的时间,把当前的发送码率当作当前码率
current_bitrate_ = *input->estimated_throughput;
bitrate_is_initialized_ = true;
}
}
// 调整码率
ChangeBitrate(*input, at_time);
return current_bitrate_;
}
ChangeBitrate()中会基于网络拥堵状态进行2.6.1.2中所述的状态机轮转判断,基于当前吞吐量增加减少码率
/**
* @description: 根据当前吞吐量和bw_state进行aimd,调整码率
* @param {*}
* @return {*}
*/
void AimdRateControl::ChangeBitrate(const RateControlInput& input,
Timestamp at_time) {
absl::optional<datarate> new_bitrate;
DataRate estimated_throughput =
input.estimated_throughput.value_or(latest_estimated_throughput_);
if (input.estimated_throughput)
latest_estimated_throughput_ = *input.estimated_throughput;
// bitrate_is_initialized_表示的是current_bitrate_是否被初始化
// current_bitrate_被初始化的场景有两个:
// 1.初始化时,外部调用SetStartBitrate()初始化current_bitrate_
// 2.第一探测到overusing,到达两路最大容量,用这个值去初始化aimud的码率
// 这个判断表示的是current_bitrate_还未被设置,所以不做normal和increase,直接返回
// 或者检测到kBwOverusing发现链路最大容量 ,要对对current_bitrate_进行初始化,并降低码率
if (!bitrate_is_initialized_ &&
input.bw_state != BandwidthUsage::kBwOverusing)
return;
// 根据状态机轮转,判断当前的码率控制状态rate_control_state是(increase。 hold,decrease)
ChangeState(input, at_time);
// We limit the new bitrate based on the troughput to avoid unlimited bitrate
// increases. We allow a bit more lag at very low rates to not too easily get
// stuck if the encoder produces uneven outputs.
// 将新的码率基于吞吐量(throughput)*1.5 去避免无限增加
// 在低码率的时候,允许一定的滞后性,为了避免编码器输出的码率波动导致的频繁阻塞
const DataRate troughput_based_limit =
1.5 * estimated_throughput + DataRate::KilobitsPerSec(10);
switch (rate_control_state_) {
case RateControlState::kRcHold:
break;
case RateControlState::kRcIncrease:
// 如果当前吞吐量大大超过了链路容积(link_capacity_),预估的链路容积已经不准确,需要重新估计,
// 链路容积差不多过去min(吞吐量, 估计码率)的指数平均
if (estimated_throughput > link_capacity_.UpperBound())
link_capacity_.Reset();
// 如果是alr状态,不增长码率,因为无法有足够的码率去探测增长后的码率是否正确
// 如果早先因为使用probe bitrate作为吞吐量使得预测码率增长超过当前的输入吞吐量阈值
// 并且当前仍处于increase状态,则说明早先增长的码率是正确的,不必根据当前吞吐量做increase
if (current_bitrate_ < troughput_based_limit &&
!(send_side_ && in_alr_ && no_bitrate_increase_in_alr_)) {
DataRate increased_bitrate = DataRate::MinusInfinity();
if (link_capacity_.has_estimate()) {
// link_capacity estimate没有在上面被重置说明测量的吞吐量和预估链路相差的并不太大
// 这个时候使用加性增长即可
// 计算出要加性增长的码率
DataRate additive_increase =
AdditiveRateIncrease(at_time, time_last_bitrate_change_);
// 把码率和当前码率加起来
increased_bitrate = current_bitrate_ + additive_increase;
} else {
// 否则的话, link capacity被重置了,预估的链路不准确,
// 对当前的码率 current_bitrate_做乘性增长
// 计算乘性增长下要增长的码率
DataRate multiplicative_increase = MultiplicativeRateIncrease(
at_time, time_last_bitrate_change_, current_bitrate_);
// 计算增长后的码率
increased_bitrate = current_bitrate_ + multiplicative_increase;
}
// 将被调节后的码率和吞吐量阈值比较,不能超过吞吐量阈值
new_bitrate = std::min(increased_bitrate, troughput_based_limit);
}
time_last_bitrate_change_ = at_time;
break;
case RateControlState::kRcDecrease: {
DataRate decreased_bitrate = DataRate::PlusInfinity();
// 码率降低,直接使用当前的吞吐量 * beta_(0.85)
decreased_bitrate = estimated_throughput * beta_;
if (decreased_bitrate > current_bitrate_ && !link_capacity_fix_) {
// TODO(terelius): The link_capacity estimate may be based on old
// throughput measurements. Relying on them may lead to unnecessary
// BWE drops.
// 当前吞吐量 * 0.85仍然大于目标码率,使用过去的吞吐量link_capacity_做减少
// 但历史吞吐量可能不及时太小,可能导致不必要的drop
if (link_capacity_.has_estimate()) {
decreased_bitrate = beta_ * link_capacity_.estimate();
}
}
if (estimate_bounded_backoff_ && network_estimate_) {
// 开启了 使用预估下边界码率减退(estimate_bounded_backoff)
// 在(estimated_throughput * beta_, estimate_lower_bound *beta)中选最大
// 前者是测出来的acknowlege 吞吐量,后者是其它estimator探测出来的下边界(这块随着network_estimator未启用而未启用)
decreased_bitrate = std::max(
decreased_bitrate, network_estimate_->link_capacity_lower * beta_);
}
// Avoid increasing the rate when over-using.
if (decreased_bitrate < current_bitrate_) {
new_bitrate = decreased_bitrate;
}
// 记录本次减少增量:last_decrease_
if (bitrate_is_initialized_ && estimated_throughput < current_bitrate_) {
if (!new_bitrate.has_value()) {
last_decrease_ = DataRate::Zero();
} else {
last_decrease_ = current_bitrate_ - *new_bitrate;
}
}
if (estimated_throughput < link_capacity_.LowerBound()) {
// The current throughput is far from the estimated link capacity. Clear
// the estimate to allow an immediate update in OnOveruseDetected.
// 吞吐量比链路估计下限小太多,波动很大,link_capacity要reset
// 因为link_capacity内部是一个使用estimated_throughput的指数平滑,
// 差太多的时候继续做指数平滑被认为没有意义
link_capacity_.Reset();
}
// 当码率第一次下下降后认为已经探测到容量上限了,直接设置initialized_的状态
bitrate_is_initialized_ = true;
// 使用estimated_throughput做指数平滑
link_capacity_.OnOveruseDetected(estimated_throughput);
// Stay on hold until the pipes are cleared.
rate_control_state_ = RateControlState::kRcHold;
time_last_bitrate_change_ = at_time;
time_last_bitrate_decrease_ = at_time;
break;
}
default:
assert(false);
}
// new_bitrate 夹逼在[min_configured_bitrate_, upper_bound]中
current_bitrate_ = ClampBitrate(new_bitrate.value_or(current_bitrate_));
}
AimdRateControl::ChangeBitrate()中:
-
首先会获取输入的当前吞吐量, 这个入参吞吐量在前面已经知道,并不就一定是当前实际的吞吐量,可能由于各种情况是上一个预测码率,或者是探测码率
-
检查rate controller设置初始化码率,如果没有,除非遇到overusing的状态,认为检测到链路的码率了,使用它作为初始化码率,否则在normal和underuse的情况下不处理
-
调用ChangeState()根据输入的网络状态进行状态机轮转,判断当前要对码率做(hold, decrease, increase)
void AimdRateControl::ChangeState(const RateControlInput& input,
Timestamp at_time) {
switch (input.bw_state) {
case BandwidthUsage::kBwNormal:
if (rate_control_state_ == RateControlState::kRcHold) {
time_last_bitrate_change_ = at_time;
rate_control_state_ = RateControlState::kRcIncrease;
}
break;
case BandwidthUsage::kBwOverusing:
if (rate_control_state_ != RateControlState::kRcDecrease) {
rate_control_state_ = RateControlState::kRcDecrease;
}
break;
case BandwidthUsage::kBwUnderusing:
rate_control_state_ = RateControlState::kRcHold;
break;
default:
assert(false);
}
}
-
基于当前的吞吐量,计算一个吞吐量阈值(troughput_based_limit),防止吞吐量无限增加
-
如果是Increase, 首先判断链路容量(历史吞吐量的指数平滑)和吞吐量阈值是否相差太大,如果是则对链路容量进行重设,并判断是否处于alr状态,或者早先因为别的原因已经提升过码率超过当前吞吐量阈值,如果是则不对码率做increase, 否则的话,则检查链路容量(link_capacity)是否有值,如果有,说明吞吐量和链路容量相差不大,则使用AdditiveRateIncrease()对当前码率做加性增加,否则使用MultiplicativeRateIncrease()做乘性增加
- 如果是decrease, 直接将当前吞吐量 * 0.85作为新码率,但该码率可能仍大于上一个调整后的码率,如果有这种情况出现,则使用链路容量(link_capacity) * 0.85作为新码率, 如果开启了使用预估下边界码率减退(estimate_bounded_backoff), 则下降的码率为min(decreased_bitrate, network_estimate_->link_capacity_lower * beta_), 计算完成后将bitrate_is_initialized设置为true,标识着已经完成初始化
其中,关于加性增长AdditiveRateIncrease()是在草案中规定,详见此处,概括为就是要在当前的码率下多发一个包要增加多少码率,详见下:
DataRate AimdRateControl::AdditiveRateIncrease(Timestamp at_time,
Timestamp last_time) const {
// 计算上次码率到现在的时间间隔
double time_period_seconds = (at_time - last_time).seconds<double>();
// 计算一个response_time中增加一个packet每秒需要增加多少码率
// 乘以interval 算出最终要增加的码率
double data_rate_increase_bps =
GetNearMaxIncreaseRateBpsPerSecond() * time_period_seconds;
return DataRate::BitsPerSec(data_rate_increase_bps);
}
/**
* @description: 计算一个response time下增加一个包需要增加多少码率
* 详情可见: https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#page-10
* response time被定义为网络中的一个请求到服务端处理到回来的整个的时间
* @param {*}
* @return {*}
*/
double AimdRateControl::GetNearMaxIncreaseRateBpsPerSecond() const {
// 计算帧率对应当前码率(current_bitrate_)下,一个packet的大小是多少
RTC_DCHECK(!current_bitrate_.IsZero());
const TimeDelta kFrameInterval = TimeDelta::Seconds(1) / 30; //fps 为30
DataSize frame_size = current_bitrate_ * kFrameInterval; //按照当前码率和帧间隔,计算一帧的大小
const DataSize kPacketSize = DataSize::Bytes(1200); //设一个packet大小为1200
double packets_per_frame = std::ceil(frame_size / kPacketSize);//计算一帧中有多少个packet
DataSize avg_packet_size = frame_size / packets_per_frame; // 计算一个packet有多大
// Approximate the over-use estimator delay to 100 ms.
// rtt_默认为200ms,但会通过接收到的RTCP报文使用接口setRtt()做修改
TimeDelta response_time = rtt_ + TimeDelta::Millis(100);
if (in_experiment_)
response_time = response_time * 2;
// 每秒要增加的码率 packet_size / response_time
double increase_rate_bps_per_second =
(avg_packet_size / response_time).bps<double>();
double kMinIncreaseRateBpsPerSecond = 4000;
// 最少要增长4kb/s码率
return std::max(kMinIncreaseRateBpsPerSecond, increase_rate_bps_per_second);
}
而乘性增加MultiplicativeRateIncrease()就很好理解,就是上一个预估码率乘以一个系数alpha(1.08)
DataRate AimdRateControl::MultiplicativeRateIncrease(
Timestamp at_time,
Timestamp last_time,
DataRate current_bitrate) const {
double alpha = 1.08;
if (last_time.IsFinite()) {
auto time_since_last_update = at_time - last_time;
alpha = pow(alpha, std::min(time_since_last_update.seconds<double>(), 1.0));
}
DataRate multiplicative_increase =
std::max(current_bitrate * (alpha - 1.0), DataRate::BitsPerSec(1000));
return multiplicative_increase;
}
至此,基于延迟的码率预估介绍完毕
2.7 基于丢包码率预估-SendSideBandwidthEstimation
SendSideBandwidthEstimation主要是基于丢包率对当前码率进行预估的,其丢包控制如2.7.1所述,也差不多是接近最终码率调整的最后一环
2.7.1 webrtc中基于丢包的码率预估
基于丢包的码率预估非常简单, 如下式,当丢包率小于0.02的时候,在上一个码率上乘以1.08,丢包率小于0.1时保持当前码率,当丢包率大于0.1时乘以(1 - 0.5 * loss)
2.7.2 统计更新丢包率-UpdatePacketsLost()
每当收到cc-feedback或者收到RR-report的时候就能统计出丢包率,在cc-controller中就会调用SendSideBandwidthEstimation::UpdatePacketsLost()
去更新丢包率,同时进行码率预估
void SendSideBandwidthEstimation::UpdatePacketsLost(int64_t packets_lost,
int64_t number_of_packets,
Timestamp at_time) {
last_loss_feedback_ = at_time;
if (first_report_time_.IsInfinite())
first_report_time_ = at_time;
// Check sequence number diff and weight loss report
if (number_of_packets > 0) {
int64_t expected =
expected_packets_since_last_loss_update_ + number_of_packets;
// Don't generate a loss rate until it can be based on enough packets.
// 丢包统计的总报数不能小于20,否则不更新丢包率
if (expected < kLimitNumPackets) {
// Accumulate reports.
expected_packets_since_last_loss_update_ = expected; // 总包数
lost_packets_since_last_loss_update_ += packets_lost; // 丢包数
return;
}
has_decreased_since_last_fraction_loss_ = false;
// 怕太小了,所以乘了个256
int64_t lost_q8 = (lost_packets_since_last_loss_update_ + packets_lost)
<< 8;
// 计算当前fraction的丢包率
last_fraction_loss_ = std::min<int>(lost_q8 / expected, 255);
// Reset accumulators.
lost_packets_since_last_loss_update_ = 0;
expected_packets_since_last_loss_update_ = 0;
last_loss_packet_report_ = at_time;
// 更新目标码率
UpdateEstimate(at_time);
}
UpdateUmaStatsPacketsLost(at_time, packets_lost);
}
SendSideBandwidthEstimation::UpdatePacketsLost()中:
- 根据总包数和丢包数计更新了当前的丢包率(last_fraction_loss), 该丢包率经过256倍放大
- 调用UpdateEstimate()根据丢包率更新目标码率
2.7.3 基于丢包更新码率-UpdateEstimate()
void SendSideBandwidthEstimation::UpdateEstimate(Timestamp at_time) {
// 此处函数调用有两个地方:
// 1.是incoming feedback
// 2. rtptransportcontroller有一个processInterval()会定时将pacer中的size传过来做update
if (rtt_backoff_.CorrectedRtt(at_time) > rtt_backoff_.rtt_limit_) {
// 当前时刻下预估rtt的最大时间超过了设置的rtt最大时间,认为网络拥堵
// 直接进行更新,下调码率
if (at_time - time_last_decrease_ >= rtt_backoff_.drop_interval_ &&
current_target_ > rtt_backoff_.bandwidth_floor_) {
// 如果当前时刻超过了drop_interval且目标码率不低于最低,可以下降,则下降
time_last_decrease_ = at_time;
// 下降目标码率
// new_bitrate = max(current_target * 0.8, 5kb)
DataRate new_bitrate =
std::max(current_target_ * rtt_backoff_.drop_fraction_,
rtt_backoff_.bandwidth_floor_.Get());
// link_capacity_更新
link_capacity_.OnRttBackoff(new_bitrate, at_time);
// 更新目标码率
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
// TODO(srte): This is likely redundant in most cases.
// 什么情况下target会被更新?
ApplyTargetLimits(at_time);
return;
}
// We trust the REMB and/or delay-based estimate during the first 2 seconds if
// we haven't had any packet loss reported, to allow startup bitrate probing.
// 最初的2s没有packet loss report,相信remb或者延迟码率的估算结果
if (last_fraction_loss_ == 0 && IsInStartPhase(at_time)) {
DataRate new_bitrate = current_target_;
// TODO(srte): We should not allow the new_bitrate to be larger than the
// receiver limit here.
// 下面两者正如注释所言应该还是要改的,让他们不能比receiver_limit_/delay_based_limit_ 大
if (receiver_limit_.IsFinite())
new_bitrate = std::max(receiver_limit_, new_bitrate); //remb
if (delay_based_limit_.IsFinite())
new_bitrate = std::max(delay_based_limit_, new_bitrate);//trendline
if (loss_based_bandwidth_estimation_.Enabled()) {
// 启用丢包预估,用new_bitrate做初始化
loss_based_bandwidth_estimation_.Initialize(new_bitrate);
}
// new_bitrate被delay_based_limit_(延迟预估)更新了,清空min_bitrate_history后将其记录
if (new_bitrate != current_target_) {
// 清空min_bitrate_history
min_bitrate_history_.clear();
if (loss_based_bandwidth_estimation_.Enabled()) {
// 启用了丢包预估,上面loss_based_bandwidth_estimation_使用了new_bitrate做了初始化
// 即使用new_bitrate, bitrate_history记录它
min_bitrate_history_.push_back(std::make_pair(at_time, new_bitrate));
} else {
// 没启用丢包探测, bitrate_history记录current_target_,这就不太理解了
min_bitrate_history_.push_back(
std::make_pair(at_time, current_target_));
}
// 更新目标码率
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
}
// 对min_history进行更新,让front元素为bwe最开始incrase时的码率
UpdateMinHistory(at_time);
if (last_loss_packet_report_.IsInfinite()) {
// No feedback received.
// TODO(srte): This is likely redundant in most cases.
ApplyTargetLimits(at_time);
return;
}
if (loss_based_bandwidth_estimation_.InUse()) {
// 根据丢包和延迟预估码率,估算一个新码率
DataRate new_bitrate = loss_based_bandwidth_estimation_.Update(
at_time, min_bitrate_history_.front().second, delay_based_limit_,
last_round_trip_time_);
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
// 计算上个feedback到当前feedback的时间差
TimeDelta time_since_loss_packet_report = at_time - last_loss_packet_report_;
// 处于当前loss_packet_report的有效范围内,使用该report对应的丢包率last_fraction_loss_进行更新
if (time_since_loss_packet_report < 1.2 * kMaxRtcpFeedbackInterval) {
// We only care about loss above a given bitrate threshold.
// last_fraction_loss_之前扩大了256,现在只是缩减回去
float loss = last_fraction_loss_ / 256.0f;
// We only make decisions based on loss when the bitrate is above a
// threshold. This is a crude way of handling loss which is uncorrelated
// to congestion.
if (current_target_ < bitrate_threshold_ || loss <= low_loss_threshold_) {
// 当前目标码率小于阈值, 并且loss < 2%时,对当前目标码率(current_target)进行8%的增长
// kBweIncreaseInterval.
// Note that by remembering the bitrate over the last second one can
// rampup up one second faster than if only allowed to start ramping
// at 8% per second rate now. E.g.:
// If sending a constant 100kbps it can rampup immediately to 108kbps
// whenever a receiver report is received with lower packet loss.
// If instead one would do: current_target_ *= 1.08^(delta time),
// it would take over one second since the lower packet loss to achieve
// 108kbps.
// 这里解释一下,码率的8%的增长居然考虑了(delta time), 当考虑到时间就有下面两种增长方式了
// 1. delta_ms * 1.08, 2. 1.08^(delta_ms)
// jonas认为第二种增长慢,对于100kb/s将会使用超过一秒的时间才能增长到108
// 但我觉得这里犯了一个数学错误: 比如1秒内收到两个report结果是: 1.08^0.5 * 1.08 ^ 0.5 = 1.08
// 似乎没区别, jonas可能把它当成了 1.08^0.5^0.5? 不至于呀,不理解
DataRate new_bitrate = DataRate::BitsPerSec(
min_bitrate_history_.front().second.bps() * 1.08 + 0.5);
// Add 1 kbps extra, just to make sure that we do not get stuck
// (gives a little extra increase at low rates, negligible at higher
// rates).
// 增加额外的1kbs确保不会停滞
new_bitrate += DataRate::BitsPerSec(1000);
// 更新当前目标码率
UpdateTargetBitrate(new_bitrate, at_time);
return;
} else if (current_target_ > bitrate_threshold_) {
// 当前设定的目标码率大于设定阈值了
if (loss <= high_loss_threshold_) {
// Loss between 2% - 10%: Do nothing.
} else {
// Loss > 10%: Limit the rate decreases to once a kBweDecreaseInterval
// + rtt.
if (!has_decreased_since_last_fraction_loss_ &&
(at_time - time_last_decrease_) >=
(kBweDecreaseInterval + last_round_trip_time_)) {
time_last_decrease_ = at_time;
// 降低码率:
// current_target_ = current_target_ * (1 -0.5 * lossrate)
// 下面的512是因为last_fraction_loss进行了256扩大
DataRate new_bitrate = DataRate::BitsPerSec(
(current_target_.bps() *
static_cast<double>(512 - last_fraction_loss_)) /
512.0);
has_decreased_since_last_fraction_loss_ = true;
// 更新当前目标码率
UpdateTargetBitrate(new_bitrate, at_time);
return;
}
}
}
}
// TODO(srte): This is likely redundant in most cases.
ApplyTargetLimits(at_time);
}
SendSideBandwidthEstimation::UpdateEstimate()中:
-
主要是通过丢包率(last_fraction_loss)来和2.7.1调整码率
-
因为该函数也会在无feedback的时候被定时调用去更新码率,所以在开头使用rtt_backoff.CorrectedRtt()预估了一个当前的rtt时间,判断当前是否存在rtt超时在没有feedback的情况下调用该函数, 如果是则直接下调码率, 下调的方式为max(current_target * 0.8, 5kb),下调完成后使用UpdateTargetBitrate()更新目标码率,预估rtt的方式如下:
/** * @description: 假设at_time为当前的rtt_feedback的时间,预估一个当前的可能的rtt时间 * 原理上,根据上一个rtt时间 + 最后一个发送包组可能的首包发送时间预估出一个当前的可能rtt * @param {at_time} 当前时刻 * @return {*} */ TimeDelta RttBasedBackoff::CorrectedRtt(Timestamp at_time) const { // 计算从最近更新上一个rtt到现在的时间 TimeDelta time_since_rtt = at_time - last_propagation_rtt_update_; // TimeDelta timeout_correction = time_since_rtt; // Avoid timeout when no packets are being sent. // 计算当前时间到最后一个包的发送时间 TimeDelta time_since_packet_sent = at_time - last_packet_sent_; // 计算一个首包到最后一个包的发送时间,实际上是(last_packet_sent_ - last_propagation_rtt_update_) timeout_correction = std::max(time_since_rtt - time_since_packet_sent, TimeDelta::Zero()); return timeout_correction + last_propagation_rtt_; }
-
接下来判断是否是最初阶段,如果是并且存在remb预估得到的码率(receiver_limit)或者trendline预估得到的码率(delay_based_limit),选择其中的最大者;
-
维护一个存储着<时间,码率>的队列(min_bitrate_history),队列头是1s前时刻下的最低码率
void SendSideBandwidthEstimation::UpdateMinHistory(Timestamp at_time) { // 队列 o: old。 n:new //queue: n、n、n、n、o、o、o、o、o // 压缩到1s: n、n、n、n、o // 将back端大于target的给移除后插入: curent_target、n、o // Remove old data points from history. // Since history precision is in ms, add one so it is able to increase // bitrate if it is off by as little as 0.5ms. // 将队列窗口的长度设置在一个1000ms区间内,这是一个可能的bweincrease范围 while (!min_bitrate_history_.empty() && at_time - min_bitrate_history_.front().first + TimeDelta::Millis(1) > kBweIncreaseInterval) { min_bitrate_history_.pop_front(); } // Typical minimum sliding-window algorithm: Pop values higher than current // bitrate before pushing it. // 将队列back端大于current_target的都给remove掉,删除式插入排序? while (!min_bitrate_history_.empty() && current_target_ <= min_bitrate_history_.back().second) { min_bitrate_history_.pop_back(); } min_bitrate_history_.push_back(std::make_pair(at_time, current_target_)); }
-
然后开始判断,当丢包率<2%的时候,开始以8%的速度增加码率,码率的8%的增长考虑了两个feedback的始检查, 使用last_bitrate * delta_ms * 1.08的方式,然后调用UpdateTargetBitrate()更新码率
-
当丢包率2% ~10%的时候继续保持当前码率,当丢包率大于10%的时候开始降低码率,降低的方式为bitrate * (1 -0.5 * lossrate), 然后调用UpdateTargetBitrate()更新码率
2.7.4 更新目标码率-UpdateTargetBitrate()
DataRate SendSideBandwidthEstimation::GetUpperLimit() const {
// upper_limit为min[delay_based_limit_, receiver_limit_]
DataRate upper_limit = delay_based_limit_;
if (!receiver_limit_caps_only_)
upper_limit = std::min(upper_limit, receiver_limit_);
upper_limit = std::min(upper_limit, max_bitrate_configured_);
return upper_limit;
}
void SendSideBandwidthEstimation::UpdateTargetBitrate(DataRate new_bitrate,
Timestamp at_time) {
// 新码率不能大于延迟预估码率(delay_based_limit_)
new_bitrate = std::min(new_bitrate, GetUpperLimit());
if (new_bitrate < min_bitrate_configured_) {
// 设置了最小目标码率,使用最小目标码率
MaybeLogLowBitrateWarning(new_bitrate, at_time);
new_bitrate = min_bitrate_configured_;
}
// 更新当前目标码率
current_target_ = new_bitrate;
MaybeLogLossBasedEvent(at_time);
// 更新链路预估容量capacity_estimate_bps_
link_capacity_.OnRateUpdate(acknowledged_rate_, current_target_, at_time);
}
此处目标码率的更新有细节,在UpdateTargetBitrate()的开头会对new_bitrata进行一个限制,要求其不能大于延迟预估码率,其次不能小于配置最小的码率(默认为5kb) ,这个最小值会被设置调整,但是看到这里的都明白webrtc只能支持10%以下的丢包,一旦丢包在10%以上持续,码率会越来越小,直到到达这个最小值,对于一些弱网情况,如wifi穿墙,信号差等,丢包率10%以上是很正常的,但是此时却远远没有达到链路的最大负载,不太清楚webrtc目前为何做这样的设计,也是一个优化点
2.8 码率窗口控制器-CongestionWindowPushbackController
这个类使用了一个拥塞窗口,对目标码率进一步调整升降,其核心非常简单: 就是统计一段时间内发送窗口的使用率来决定是应该提升码率还是降低码率,如下所示:
uint32_t CongestionWindowPushbackController::UpdateTargetBitrate(
uint32_t bitrate_bps) {
if (!current_data_window_ || current_data_window_->IsZero())
return bitrate_bps;
int64_t total_bytes = outstanding_bytes_;
if (add_pacing_)
total_bytes += pacing_bytes_;
// 计算窗口使用比例
double fill_ratio =
total_bytes / static_cast<double>(current_data_window_->bytes());
if (fill_ratio > 1.5) {
encoding_rate_ratio_ *= 0.9;
} else if (fill_ratio > 1) {
encoding_rate_ratio_ *= 0.95;
} else if (fill_ratio < 0.1) {
encoding_rate_ratio_ = 1.0;
} else {
encoding_rate_ratio_ *= 1.05;
encoding_rate_ratio_ = std::min(encoding_rate_ratio_, 1.0);
}
// 重新调整码率
uint32_t adjusted_target_bitrate_bps =
static_cast<uint32_t>(bitrate_bps * encoding_rate_ratio_);
// Do not adjust below the minimum pushback bitrate but do obey if the
// original estimate is below it.
// 不要比最小值小
bitrate_bps = adjusted_target_bitrate_bps < min_pushback_target_bitrate_bps_
? std::min(bitrate_bps, min_pushback_target_bitrate_bps_)
: adjusted_target_bitrate_bps;
return bitrate_bps;
}
CongestionWindowPushbackController::UpdateTargetBitrate()中:
- 窗口使用率大于1.5倍时,编码码率*0.9; 大于1倍时,乘以0.95;小于0.1时候,编码码率重置为1.0;当处于[0.1 ,1]时,编码码率乘以1.05逐步缓升,不超过1.0
- 窗口使用比例的计算,使用的total_bytes,这个值是当前pacer queue的大小 + 已经发往网络(还未到达)数据的大小,而数据窗口的大小是在cc-controller中做的, 其使用rtt作为时间窗口,使rtt * last_loss_based_target_rate(目标码率)作为数据窗口,如下所示。值得一提的是,total_bytes起初是不包括pacer queue的大小的,只是单纯的用处在网络中的数据的大小,它们相当于下一个rtt下到达的数据,这样就能够理解,为什么它除以一个rtt能得到一个有效的窗口比例值,
void GoogCcNetworkController::UpdateCongestionWindowSize() {
// 获得每个cc-report中的最大rtt数组中最小的那个
TimeDelta min_feedback_max_rtt = TimeDelta::Millis(
*std::min_element(feedback_max_rtts_.begin(), feedback_max_rtts_.end()));
const DataSize kMinCwnd = DataSize::Bytes(2 * 1500);
// 使用rtt + 额外值 作为time_window
TimeDelta time_window =
min_feedback_max_rtt +
TimeDelta::Millis(
rate_control_settings_.GetCongestionWindowAdditionalTimeMs());
// data_window = target_rate * time_window
DataSize data_window = last_loss_based_target_rate_ * time_window;
if (current_data_window_) {
data_window =
std::max(kMinCwnd, (data_window + current_data_window_.value()) / 2);
} else {
data_window = std::max(kMinCwnd, data_window);
}
current_data_window_ = data_window;
}
2.9 总结
再回头看2.1的着张图应该就清晰很多了:
- 首先进行探测码率计算和吞吐量计算
- 基于探测码率和吞吐量,开始基于延迟计算码率: 使用线性回归预估出网络的状态后,通过aimd调整码率
- 有了延迟预估码率后,基于丢包率调整码率,得到丢包预估码率,丢包预估码率以延迟预估码率为上限
- 将新的目标码率设置到探测控制器上,检测后续是否进行探测,以及探测码率的大小
- 通过拥塞窗口进一步对目标码率进行调整
本文的webrtc拥塞控制的上文,主要分析码率预估和调整的过程;下文将会介绍预估出的码率在webrtc中编码器,fec,pacer中的分配和使用。
3.Ref
R1. webrtc拥塞控制论文: Analysis and Design of the Google Congestion Contro for Web Real-time Communication (WebRTC)
R2. transport-cc-feedback草案: https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02#page-10
R3. WebRTC GCC拥塞控制算法详解(一朵喇叭花压海棠) : https://blog.csdn.net/sonysuqin/article/details/106186374)
R4. congestion_controller、 remote bitrate estimator、pacing模块浅析(吃好,睡好,身体好): (https://blog.csdn.net/weixin_29405665/article/details/110420315)
R5. WebRTC研究:Transport-cc之RTP及RTCP(剑痴乎 ):https://blog.jianchihu.net/webrtc-research-transport-cc-rtp-rtcp.html