zoukankan      html  css  js  c++  java
  • 协程库中 WaitGroup / CountDownLatch 实现

    协程库中 WaitGroup / CountDownLatch 实现

    这几天读了一些协程的文章。看了看开源协程库,在腾讯的 libco 和魅族的 libgo 中选择了 libgo,结果发现这边提供了协程池,但却没有提供CountDownLatch、SyncClosure之类的设置,要是每次起一组任务都要用一个协程池的话,有点大材小用了。这里我们手动实现一个 CountDownLatch。

    先说结论,选用最后一种方案。


    由于开始的时候,没有去读过线程池的实现,所以走了一些弯路。

    计数器 + co_yield

    第一种思路,使用计数器,Done时自减,Wait检查计数器并主动让出CPU。
    这种做法使用到了 co_yield 让出CPU,但网上有人提出协程较少的时候,会发生CPU占用100%的问题。
    举个例子:进程使用了 4 个线程,现在一个协程在等待8个协程,其中7个已经完成,只有一个还有较长时间才能结束,并且没有其他协程加入到任务队列了。这个时候 Wait 让出CPU之后很快又会被线程调度上CPU,检查到引用计数不满足条件后再次让出CPU,然后又立刻被调度上CPU检查,如此循环直到满足条件结束任务。
    问题原因在于,我们这里只是让出了CPU,Wait仍然还是放在thread的ready队列中,而不是放在blocked队列。我们需要一种机制,wait需要的信号,并在信号到来的时候重新进入ready队列。这种机制类似于线程调度中的 【条件变量】 —— 条件不满足则 idle,条件发生时唤醒线程。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
        }
        void Add(size_t n) {
            mFlyingCount += n;
        }
        void Done() {
            mFlyingCount--;
        }
        void Wait() {
            while(mFlyingCount > 0) {
                co_yield;
            }
        }
    private:
        std::atomic<size_t> mFlyingCount;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    channel一对一阻塞

    第二种思路,最简单但是开销比较大:使用 n 个不带缓冲的 channel 来阻塞 Wait 操作。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {
            chans.resize(n, co_chan<int>());
        }
        void Done() {
            chans[--mFlyingCount].Close();
        }
        void Wait() {
            int x;
            for (auto& ch : chans) {
                ch >> x;
            }
        }
    private:
        std::atomic<size_t> mFlyingCount;
        std::vector<co_chan<int>> chans;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    一个channel唤醒Wait

    看上面的源码我们发现,由于我们是从后往前关闭的channel,等待channel时却是从前往后,因此实际阻塞Wait的只有第一个channel,于是有了进一步简化的方案。
    第三种思路,使用不带缓冲的channel作为通信机制,最后一个结束的Done唤醒Wait协程。这种实现起来很简单,依靠的是 channel 来作为条件变量。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n) : mFlyingCount(n) {}
    
        void Add(size_t i) {
            mFlyingCount += i;
        }
        void Done() {
            if (--mFlyingCount == 0) {
                ch_0.Close();
            }
        }
        void Wait() {
            int x;
            ch_0 >> x;
        }
    private:
        std::atomic<size_t> mFlyingCount {1};
        co_chan<int> ch_0;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    mutex + condition_variable

    既然第三种方案提到了条件变量,那么可不可以直接使用条件变量呢?可以的!
    第四种实现,剥离无用的上层包装,直接使用协程的条件变量来处理等待条件。这种方案其实脱胎于线程池的 CountDownLatch,只要协程的 mutex 和 condition_variable 实现的没有问题,这个实现就是可以正常工作的。

    class CountDownLatch {
    public:
        explicit CountDownLatch(size_t n = 1) : mFlyingCount(n) {}
    
        void Done() {
            std::unique_lock<::co::CoMutex> lck(mu);
            if (--mFlyingCount <= 0) {
                cv.notify_all();
            }
        }
        void Wait() {
            std::unique_lock<::co::CoMutex> lck(mu);
            while (mFlyingCount > 0) {
                cv.wait(lck);
            }
        }
    private:
        size_t mFlyingCount;
        ::co::CoMutex mu;
        ::co::ConditionVariableAny cv;
    
        CountDownLatch(CountDownLatch const &) = delete;
        CountDownLatch(CountDownLatch &&) = delete;
        CountDownLatch& operator=(CountDownLatch const &) = delete;
        CountDownLatch& operator=(CountDownLatch &&) = delete;
    };
    

    这里有几个需要注意的点:

    • 第一个是锁,操作 condition_variable 是需要和锁搭配使用的,不能单独使用 condition_variable;
    • 第二是 unique_lock,C++11 thread库 提供了两种锁专用的智能指针,lock_guard 和 unique_lock,区别是前者会一直持有锁,后者可以在wait的时候解锁再加锁;
    • 第三,不仅cv需要锁保护,计数器操作也需要加锁,计数器和cv放到一起才是一个完整的临界资源,而不是单独来看的。
  • 相关阅读:
    函数重载和函数指针在一起
    Uva
    Uva
    Uva
    Uva
    Uva
    CCPC-Wannafly-day5
    CCPC-Wannafly-day3
    CCPC-Wannafly-day2
    CCPC-Wannafly-Winter 2020.01.12总结
  • 原文地址:https://www.cnblogs.com/zhcpku/p/15223847.html
Copyright © 2011-2022 走看看