协程库中 WaitGroup / CountDownLatch 实现
这几天读了一些协程的文章。看了看开源协程库,在腾讯的 libco 和魅族的 libgo 中选择了 libgo,结果发现这边提供了协程池,但却没有提供CountDownLatch、SyncClosure之类的设置,要是每次起一组任务都要用一个协程池的话,有点大材小用了。这里我们手动实现一个 CountDownLatch。
先说结论,选用最后一种方案。
由于开始的时候,没有去读过线程池的实现,所以走了一些弯路。
计数器 + co_yield
第一种思路,使用计数器,Done时自减,Wait检查计数器并主动让出CPU。
这种做法使用到了 co_yield 让出CPU,但网上有人提出协程较少的时候,会发生CPU占用100%的问题。
举个例子:进程使用了 4 个线程,现在一个协程在等待8个协程,其中7个已经完成,只有一个还有较长时间才能结束,并且没有其他协程加入到任务队列了。这个时候 Wait 让出CPU之后很快又会被线程调度上CPU,检查到引用计数不满足条件后再次让出CPU,然后又立刻被调度上CPU检查,如此循环直到满足条件结束任务。
问题原因在于,我们这里只是让出了CPU,Wait仍然还是放在thread的ready队列中,而不是放在blocked队列。我们需要一种机制,wait需要的信号,并在信号到来的时候重新进入ready队列。这种机制类似于线程调度中的 【条件变量】 —— 条件不满足则 idle,条件发生时唤醒线程。
class CountDownLatch {
public:
explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
}
void Add(size_t n) {
mFlyingCount += n;
}
void Done() {
mFlyingCount--;
}
void Wait() {
while(mFlyingCount > 0) {
co_yield;
}
}
private:
std::atomic<size_t> mFlyingCount;
CountDownLatch(CountDownLatch const &) = delete;
CountDownLatch(CountDownLatch &&) = delete;
CountDownLatch& operator=(CountDownLatch const &) = delete;
CountDownLatch& operator=(CountDownLatch &&) = delete;
};
channel一对一阻塞
第二种思路,最简单但是开销比较大:使用 n 个不带缓冲的 channel 来阻塞 Wait 操作。
class CountDownLatch {
public:
explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
chans.resize(n, co_chan<int>());
}
void Done() {
chans[--mFlyingCount].Close();
}
void Wait() {
int x;
for (auto& ch : chans) {
ch >> x;
}
}
private:
std::atomic<size_t> mFlyingCount;
std::vector<co_chan<int>> chans;
CountDownLatch(CountDownLatch const &) = delete;
CountDownLatch(CountDownLatch &&) = delete;
CountDownLatch& operator=(CountDownLatch const &) = delete;
CountDownLatch& operator=(CountDownLatch &&) = delete;
};
一个channel唤醒Wait
看上面的源码我们发现,由于我们是从后往前关闭的channel,等待channel时却是从前往后,因此实际阻塞Wait的只有第一个channel,于是有了进一步简化的方案。
第三种思路,使用不带缓冲的channel作为通信机制,最后一个结束的Done唤醒Wait协程。这种实现起来很简单,依靠的是 channel 来作为条件变量。
class CountDownLatch {
public:
explicit CountDownLatch(size_t n) : mFlyingCount(n) {}
void Add(size_t i) {
mFlyingCount += i;
}
void Done() {
if (--mFlyingCount == 0) {
ch_0.Close();
}
}
void Wait() {
int x;
ch_0 >> x;
}
private:
std::atomic<size_t> mFlyingCount {1};
co_chan<int> ch_0;
CountDownLatch(CountDownLatch const &) = delete;
CountDownLatch(CountDownLatch &&) = delete;
CountDownLatch& operator=(CountDownLatch const &) = delete;
CountDownLatch& operator=(CountDownLatch &&) = delete;
};
mutex + condition_variable
既然第三种方案提到了条件变量,那么可不可以直接使用条件变量呢?可以的!
第四种实现,剥离无用的上层包装,直接使用协程的条件变量来处理等待条件。这种方案其实脱胎于线程池的 CountDownLatch,只要协程的 mutex 和 condition_variable 实现的没有问题,这个实现就是可以正常工作的。
class CountDownLatch {
public:
explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {}
void Done() {
std::unique_lock<::co::CoMutex> lck(mu);
if (--mFlyingCount <= 0) {
cv.notify_all();
}
}
void Wait() {
std::unique_lock<::co::CoMutex> lck(mu);
while (mFlyingCount > 0) {
cv.wait(lck);
}
}
private:
size_t mFlyingCount;
::co::CoMutex mu;
::co::ConditionVariableAny cv;
CountDownLatch(CountDownLatch const &) = delete;
CountDownLatch(CountDownLatch &&) = delete;
CountDownLatch& operator=(CountDownLatch const &) = delete;
CountDownLatch& operator=(CountDownLatch &&) = delete;
};
这里有几个需要注意的点:
- 第一个是锁,操作 condition_variable 是需要和锁搭配使用的,不能单独使用 condition_variable;
- 第二是 unique_lock,C++11 thread库 提供了两种锁专用的智能指针,lock_guard 和 unique_lock,区别是前者会一直持有锁,后者可以在wait的时候解锁再加锁;
- 第三,不仅cv需要锁保护,计数器操作也需要加锁,计数器和cv放到一起才是一个完整的临界资源,而不是单独来看的。