zoukankan      html  css  js  c++  java
  • 协程库中 WaitGroup / CountDownLatch 实现

    协程库中 WaitGroup / CountDownLatch 实现

    这几天读了一些协程的文章。看了看开源协程库,在腾讯的 libco 和魅族的 libgo 中选择了 libgo,结果发现这边提供了协程池,但却没有提供CountDownLatch、SyncClosure之类的设置,要是每次起一组任务都要用一个协程池的话,有点大材小用了。这里我们手动实现一个 CountDownLatch。

    先说结论,选用最后一种方案。


    由于开始的时候,没有去读过线程池的实现,所以走了一些弯路。

    计数器 + co_yield

    第一种思路,使用计数器,Done时自减,Wait检查计数器并主动让出CPU。
    这种做法使用到了 co_yield 让出CPU,但网上有人提出协程较少的时候,会发生CPU占用100%的问题。
    举个例子:进程使用了 4 个线程,现在一个协程在等待8个协程,其中7个已经完成,只有一个还有较长时间才能结束,并且没有其他协程加入到任务队列了。这个时候 Wait 让出CPU之后很快又会被线程调度上CPU,检查到引用计数不满足条件后再次让出CPU,然后又立刻被调度上CPU检查,如此循环直到满足条件结束任务。
    问题原因在于,我们这里只是让出了CPU,Wait仍然还是放在thread的ready队列中,而不是放在blocked队列。我们需要一种机制,wait需要的信号,并在信号到来的时候重新进入ready队列。这种机制类似于线程调度中的 【条件变量】 —— 条件不满足则 idle,条件发生时唤醒线程。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
        }
        void Add(size_t n) {
            mFlyingCount += n;
        }
        void Done() {
            mFlyingCount--;
        }
        void Wait() {
            while(mFlyingCount > 0) {
                co_yield;
            }
        }
    private:
        std::atomic<size_t> mFlyingCount;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    channel一对一阻塞

    第二种思路,最简单但是开销比较大:使用 n 个不带缓冲的 channel 来阻塞 Wait 操作。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
            chans.resize(n, co_chan<int>());
        }
        void Done() {
            chans[--mFlyingCount].Close();
        }
        void Wait() {
            int x;
            for (auto& ch : chans) {
                ch >> x;
            }
        }
    private:
        std::atomic<size_t> mFlyingCount;
        std::vector<co_chan<int>> chans;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    一个channel唤醒Wait

    看上面的源码我们发现,由于我们是从后往前关闭的channel,等待channel时却是从前往后,因此实际阻塞Wait的只有第一个channel,于是有了进一步简化的方案。
    第三种思路,使用不带缓冲的channel作为通信机制,最后一个结束的Done唤醒Wait协程。这种实现起来很简单,依靠的是 channel 来作为条件变量。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n) : mFlyingCount(n) {}
    
        void Add(size_t i) {
            mFlyingCount += i;
        }
        void Done() {
            if (--mFlyingCount == 0) {
                ch_0.Close();
            }
        }
        void Wait() {
            int x;
            ch_0 >> x;
        }
    private:
        std::atomic<size_t> mFlyingCount {1};
        co_chan<int> ch_0;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    mutex + condition_variable

    既然第三种方案提到了条件变量,那么可不可以直接使用条件变量呢?可以的!
    第四种实现,剥离无用的上层包装,直接使用协程的条件变量来处理等待条件。这种方案其实脱胎于线程池的 CountDownLatch,只要协程的 mutex 和 condition_variable 实现的没有问题,这个实现就是可以正常工作的。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {}
    
        void Done() {
            std::unique_lock<::co::CoMutex> lck(mu);
            if (--mFlyingCount <= 0) {
                cv.notify_all();
            }
        }
        void Wait() {
            std::unique_lock<::co::CoMutex> lck(mu);
            while (mFlyingCount > 0) {
                cv.wait(lck);
            }
        }
    private:
        size_t mFlyingCount;
        ::co::CoMutex mu;
        ::co::ConditionVariableAny cv;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    这里有几个需要注意的点:

    • 第一个是锁,操作 condition_variable 是需要和锁搭配使用的,不能单独使用 condition_variable;
    • 第二是 unique_lock,C++11 thread库 提供了两种锁专用的智能指针,lock_guard 和 unique_lock,区别是前者会一直持有锁,后者可以在wait的时候解锁再加锁;
    • 第三,不仅cv需要锁保护,计数器操作也需要加锁,计数器和cv放到一起才是一个完整的临界资源,而不是单独来看的。
  • 相关阅读:
    sqlplus时报Linux-x86_64 Error: 13: Permission denied
    thrift之TTransport层的缓存传输类TBufferedTransport和缓冲基类TBufferBase
    Java实现 蓝桥杯 算法提高 新建Microsoft world文档
    Java实现 蓝桥杯 算法提高 新建Microsoft world文档
    Java实现 蓝桥杯 算法提高 快乐司机
    Java实现 蓝桥杯 算法提高 快乐司机
    Java实现 蓝桥杯 算法提高 队列操作
    Java实现 蓝桥杯 算法提高 队列操作
    Java实现 蓝桥杯 算法提高 文本加密
    Java实现 蓝桥杯 算法提高 合并石子
  • 原文地址:https://www.cnblogs.com/zhcpku/p/15223847.html
Copyright © 2011-2022 走看看