RateStatistics
这个类的作用为记录一个时间窗口内的速率值,并返回当前时间区域内的码率值。
首先引入一个桶的定义:
struct Bucket {
explicit Bucket(int64_t timestamp);
int64_t sum; // Sum of all samples in this bucket.
int num_samples; // Number of samples in this bucket.
const int64_t timestamp; // Timestamp this bucket corresponds to.
};
每毫秒维护一个桶。RateStatistics
类内部就有一个按时间顺序排列的桶队列:
// All buckets within the time window, ordered by time.
std::deque<Bucket> buckets_;
RateStatistics::Update
是最重要的接口,用于更新当前速率。
// 移除不在时间窗口内的桶
void RateStatistics::EraseOld(int64_t now_ms) {
// New oldest time that is included in data set.
const int64_t new_oldest_time = now_ms - current_window_size_ms_ + 1;
// Loop over buckets and remove too old data points.
while (!buckets_.empty() && buckets_.front().timestamp < new_oldest_time) {
const Bucket& oldest_bucket = buckets_.front();
RTC_DCHECK_GE(accumulated_count_, oldest_bucket.sum);
RTC_DCHECK_GE(num_samples_, oldest_bucket.num_samples);
accumulated_count_ -= oldest_bucket.sum;
num_samples_ -= oldest_bucket.num_samples;
buckets_.pop_front();
// This does not clear overflow_ even when counter is empty.
// TODO(https://bugs.webrtc.org/11247): Consider if overflow_ can be reset.
}
}
void RateStatistics::Update(int64_t count, int64_t now_ms) {
RTC_DCHECK_GE(count, 0);
//移除超过时间窗口的桶
EraseOld(now_ms);
if (first_timestamp_ == -1) {
first_timestamp_ = now_ms;
}
if (buckets_.empty() || now_ms != buckets_.back().timestamp) {
//当前更新时间比上次更新时间还早,属于异常情况,更换now_ms
if (!buckets_.empty() && now_ms < buckets_.back().timestamp) {
RTC_LOG(LS_WARNING) << "Timestamp " << now_ms
<< " is before the last added "
"timestamp in the rate window: "
<< buckets_.back().timestamp << ", aligning to that.";
now_ms = buckets_.back().timestamp;
}
//没有现有的桶,或者当前毫秒没有记录时,创建对应当前时间的桶
buckets_.emplace_back(now_ms);
}
Bucket& last_bucket = buckets_.back();
//更新对应时间的桶中的统计数据
last_bucket.sum += count;
++last_bucket.num_samples;
//防止accumulated_count_溢出
if (std::numeric_limits<int64_t>::max() - accumulated_count_ > count) {
accumulated_count_ += count;
} else {
overflow_ = true;
}
++num_samples_;
}
RateStatistics::Update
其实逻辑很简单,就是不断的维护时间窗口内的桶,以及维护所有桶中的sample数量和count大小。
每1ms维护一个桶
RateStatistics::Rate
用于向外提供查询当前速率的接口,而这个速率只在被调用时才计算:
absl::optional<int64_t> RateStatistics::Rate(int64_t now_ms) const {
// Yeah, this const_cast ain't pretty, but the alternative is to declare most
// of the members as mutable...
// 移除旧的桶
const_cast<RateStatistics*>(this)->EraseOld(now_ms);
int active_window_size = 0;
if (first_timestamp_ != -1) {
//距离第一次的记录的时间间隔超过了当前的时间窗口,那么计算的周期就选当前的窗口
if (first_timestamp_ <= now_ms - current_window_size_ms_) {
// Count window as full even if no data points currently in view, if the
// data stream started before the window.
active_window_size = current_window_size_ms_;
} else {
// 当前的记录不足当前时间窗口,那么按照实际的时间计算时间周期
// Size of a single bucket is 1ms, so even if now_ms == first_timestmap_
// the window size should be 1.
active_window_size = now_ms - first_timestamp_ + 1;
}
}
// If window is a single bucket or there is only one sample in a data set that
// has not grown to the full window size, or if the accumulator has
// overflowed, treat this as rate unavailable.
// 当还没有sample,或者有效时间周期小于等于1,或者只有一条sample并且尚未经过一个完整的时间窗口,或者已经统计溢出,都任务非法,返回null
if (num_samples_ == 0 || active_window_size <= 1 ||
(num_samples_ <= 1 &&
rtc::SafeLt(active_window_size, current_window_size_ms_)) ||
overflow_) {
return absl::nullopt;
}
// 使用 ((当前窗口内的所有计数/有效时间窗口) * scale_ + 0.5) 作为速率返回
float scale = static_cast<float>(scale_) / active_window_size;
float result = accumulated_count_ * scale + 0.5f;
// Better return unavailable rate than garbage value (undefined behavior).
if (result > static_cast<float>(std::numeric_limits<int64_t>::max())) {
return absl::nullopt;
}
return rtc::dchecked_cast<int64_t>(result);
}
总结
- 速率统计是计算按照当前设置的时间窗口范围内的平均速率;
- 计算公式: ((当前窗口内的所有count/有效时间窗口) * scale_ + 0.5), 其中
scale_
:
- 采用
scale = kBpsScale(8000.0f)
且count单位为Byte,Rate单位为bps;- 采用
scale = 1
,且count单位为Byte,Rate单位为KBps。