zoukankan      html  css  js  c++  java
  • 第30课 线程同步(std::condition_variable)

    一. 条件变量

    (一)条件变量概述

      多线程访问一个共享资源(或称临界区),不仅需要用互斥锁实现独享访问避免并发错误,在获得互斥锁进入临界区后,还需检查特定条件是否成立。当某个线程修改测试条件后,将通知其它正在等待条件的线程继续往下执行。

      1. wait线程:如果不满足该条件,拥有条件变量的wait线程会先释放该互斥锁,并把自身放入条件变量内部的等待队列,阻塞等待条件成立

      2. notify线程:在wait线程阻塞期间,notify线程获取互斥锁并进入临界区内访问共享资源,然后改变测试条件,当条件满足时通知在条件变量上等待的wait线程。wait线程重新申请对该互斥锁加锁,确认条件成立后则进行后续的任务处理,否则继续等待。

    (二)std::condition_variable

    class condition_variable { // class for waiting for conditions
    public:
        using native_handle_type = _Cnd_t;
    
        condition_variable() { // 默认构造函数
            _Cnd_init_in_situ(_Mycnd());
        }
    
        ~condition_variable() noexcept { // 析构函数
            _Cnd_destroy_in_situ(_Mycnd());
        }
    
        //不可复制和移动(如果重写移动,因此也就不可移动)
        condition_variable(const condition_variable&) = delete;
        condition_variable& operator=(const condition_variable&) = delete;
    
        void notify_one() noexcept { // 唤醒一个等待线程
            _Check_C_return(_Cnd_signal(_Mycnd()));
        }
    
        void notify_all() noexcept { // 唤醒所有的等待线程
            _Check_C_return(_Cnd_broadcast(_Mycnd()));
        }
    
        void wait(unique_lock<mutex>& _Lck) { // 等待,直到被唤醒
            // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow
            _Check_C_return(_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx()));
        }
    
        template <class _Predicate>
        void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // 等待信号并测试条件
            while (!_Pred()) { //判断测试条件,只有当Pred不成立时才阻塞
                wait(_Lck);
            }
        }
    
        //等待,直到被唤醒或者超时
        template <class _Rep, class _Period>
        cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) {
            if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) {
                return cv_status::timeout;
            }
    
            // The standard says that we should use a steady clock, but unfortunately our ABI
            // speaks struct xtime, which is relative to the system clock.
            _CSTD xtime _Tgt;
            const bool _Clamped     = _To_xtime_10_day_clamped(_Tgt, _Rel_time);
            const cv_status _Result = wait_until(_Lck, &_Tgt);
            if (_Clamped) {
                return cv_status::no_timeout;
            }
    
            return _Result;
        }
    
        //带超时等待并检测条件
        template <class _Rep, class _Period, class _Predicate>
        bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {
            // wait for signal with timeout and check predicate
            return _Wait_until1(_Lck, chrono::steady_clock::now() + _Rel_time, _Pred);
        }
    
        //等待,直到被唤醒或者到达某个时间点
        template <class _Clock, class _Duration>
        cv_status wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) {
            // 等待到指定的时间点
            for (;;) {
                const auto _Now = _Clock::now();
                if (_Abs_time <= _Now) {
                    return cv_status::timeout;
                }
    
                _CSTD xtime _Tgt;
                (void) _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
                const cv_status _Result = wait_until(_Lck, &_Tgt);
                if (_Result == cv_status::no_timeout) {
                    return cv_status::no_timeout;
                }
            }
        }
    
        //阻塞等待,直到信号或指定时长到达,同时检测条件
        template <class _Clock, class _Duration, class _Predicate>
        bool wait_until(
            unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {
            //等待带有超时的信号并检测条件
            return _Wait_until1(_Lck, _Abs_time, _Pred);
        }
    
        cv_status wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time) {
            // wait for signal with timeout
            if (!_Mtx_current_owns(_Lck.mutex()->_Mymtx())) {
                _Throw_Cpp_error(_OPERATION_NOT_PERMITTED);
            }
    
            // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow
            const int _Res = _Cnd_timedwait(_Mycnd(), _Lck.mutex()->_Mymtx(), _Abs_time);
            switch (_Res) {
            case _Thrd_success:
                return cv_status::no_timeout;
            case _Thrd_timedout:
                return cv_status::timeout;
            default:
                _Throw_C_error(_Res);
            }
        }
    
        template <class _Predicate>
        bool wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate _Pred) {
            // wait for signal with timeout and check predicate
            return _Wait_until1(_Lck, _Abs_time, _Pred);
        }
    
        //返回原生句柄
        _NODISCARD native_handle_type native_handle() { // return condition variable handle
            return _Mycnd();
        }
    
        void _Register(unique_lock<mutex>& _Lck, int* _Ready) { // register this object for release at thread exit
            _Cnd_register_at_thread_exit(_Mycnd(), _Lck.release()->_Mymtx(), _Ready);
        }
    
        void _Unregister(mutex& _Mtx) { // unregister this object for release at thread exit
            _Cnd_unregister_at_thread_exit(_Mtx._Mymtx());
        }
    
    private:
        aligned_storage_t<_Cnd_internal_imp_size, _Cnd_internal_imp_alignment> _Cnd_storage;
    
        _Cnd_t _Mycnd() noexcept { // get pointer to _Cnd_internal_imp_t inside _Cnd_storage
            return reinterpret_cast<_Cnd_t>(&_Cnd_storage);
        }
    
        template <class _Predicate>
        bool _Wait_until1(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate& _Pred) {
            // wait for signal with timeout and check predicate
            while (!_Pred()) {
                if (wait_until(_Lck, _Abs_time) == cv_status::timeout) {
                    return _Pred();
                }
            }
    
            return true;
        }
    
        template <class _Clock, class _Duration, class _Predicate>
        bool _Wait_until1(
            unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate& _Pred) {
            while (!_Pred()) {
                const auto _Now = _Clock::now();
                if (_Abs_time <= _Now) {
                    return false;
                }
    
                _CSTD xtime _Tgt;
                const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
                if (wait_until(_Lck, &_Tgt) == cv_status::timeout && !_Clamped) {
                    return _Pred();
                }
            }
    
            return true;
        }
    };
    【std::condition_variable源码摘要】

      1. condition_variable和condition_variable_any类相似,前者只能使用unique_lock<mutex>来锁定线程,因为在condition_variable的内部将通过调用unique_lock的lock()函数来获取mutex,而诸如lock_guard对象并不提供lock()操作。后者可以使用任意类型的可锁定对象。

      2. wait(lck)用于等待通知。而wait(lck,pred)会先判断pred条件,再决定是否阻塞等待,当pred为false则继续等待,否则直接返回,相当于 while(!pred())wait(lck)

      3. notify_one()用于通知一个等待线程,而notify_all()用于通知所有等待线程。

      4. 对象析构前,所有阻塞在该条件变量上的线程均应被notified,否则将产生不可预知行为。

      5. 条件变量不支持拷贝和移动操作。

    【编程实验】利用条件变量实现线程安全队列

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <memory>
    #include <queue>
    
    using namespace std;
    
    std::mutex io_mutex;
    
    //线程安全队列
    template<typename T>
    class threadsafe_queue
    {
        mutable std::mutex m_mutex; //互斥量必须是mutable的!for empty()和拷贝构造函数
        std::queue<T> m_queue;
        std::condition_variable m_cond;
    public:
        threadsafe_queue() {}
        threadsafe_queue(const threadsafe_queue& other)
        {
            std::lock_guard<std::mutex> lock(other.m_mutex); //other是const对象
                                                             //因lock里面的m_mutex。因此要求
                                                             //m_mutex必须是mutable。
            m_queue = other.m_queue;
        }
    
        void push(T value)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            m_queue.push(value);
    
            m_cond.notify_one();
        }
    
        //尝试从队列中取出元素(非阻塞)
        bool try_pop(T& value)
        {
            std::lock_guard<std::mutex> lock(m_mutex);
    
            if (m_queue.empty())
                return false;
    
            value = m_queue.front();
            m_queue.pop();
    
            return true;
        }
    
        std::shared_ptr<T> try_pop()
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            
            if (m_queue.empty())
                return std::shared_ptr<T>();
    
            std::shared_ptr<T> res(std::make_shared<T>(m_queue.front()));
            m_queue.pop();
    
            return res;
        }
    
        //等待,直到队列有元素就取出并返回
        void wait_and_pop(T& value)
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_cond.wait(lock, [this] {return !m_queue.empty(); });
    
            value = m_queue.front();
            m_queue.pop();
        }
    
        std::shared_ptr<T> wait_and_pop()
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_cond.wait(lock, [this] {return !m_queue.empty(); });
    
            std::shared_ptr<T> res(std::make_shared<T>(m_queue.front()));
            m_queue.pop();
    
            return res;
        }
    
        //由于empty()是个const成员函数。由于lock操作,这就要求mutex是可变的
        //因此,m_mutex声明为mutable
        bool empty() const
        {
            std::lock_guard<std::mutex> lock(m_mutex);
            return m_queue.empty();
        }
    };
    
    void producer(threadsafe_queue<int>& queue, int tag)
    {
        for (int i = 0; i < 100; ++i) {
            queue.push((i + 1) * tag);
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    }
    
    void comsumer(threadsafe_queue<int>& queue)
    {
        int elem;
    
        while (!queue.empty()) {
            
            queue.wait_and_pop(elem);
            {
                std::lock_guard<std::mutex> lock(io_mutex);
                cout << "thread(id=" << std::this_thread::get_id() << ") get element: " << elem << endl;
            }
    
            std::this_thread::sleep_for(std::chrono::milliseconds(40));
        }
    }
    
    int main()
    {
        threadsafe_queue<int> queue;
        
        //3个生产者和2个消费者
        std::thread t1(producer, std::ref(queue), 1);
        std::thread t2(producer, std::ref(queue), 2);
        std::thread t3(producer, std::ref(queue), 3);
    
        std::thread t4(comsumer, std::ref(queue));
        std::thread t5(comsumer, std::ref(queue));
    
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        t5.join();
    
        return 0;
    }

    二. 底层wait函数的行为剖析(以pthread_cond_wait为例)

     

    (一)pthread_cond_wait的执行流程

      1. 调用线程将自己放入等待队列,mutex解锁。(调用线程己加入等待队列并解锁,此时,允许其他线程改变“测试条件”)

      2. 挂起,等待pthread_cond_signal或pthread_cond_broadcast去唤醒。(其他线程改变测试条件,当条件满足时会发出通知)

      3. 被唤醒,mutex加锁

    (二)关于条件变量的几个问题

      1. 为什么在pthread_cond_wait之前需要加锁?

      mutex是用来保护“测试条件”的,调用者将mutex传递给pthread_cond_wait,该函数内部会自动将调用线程放到等待队列中,然后再解锁mutex,并等待“测试条件”成立。这种做法关闭了从我们检测“测试条件”的时刻到将线程放入到等待队列之间的这段“时间窗口”,使得“测试条件”在线程加入等待队列之前不会被其他线程修改,从而确保调用线程不会错过“测试条件”的改变。最后,当pthread_cond_wait返回前,mutex又被上锁了。

      2. 为什么使用while语句来循环判断“测试条件”而不使用if语句?

      线程API存在一个事实(很多语言中都如此,不仅仅是C++),就是即使在没有通知条件变量的情况下线程也可能被唤醒,这样的唤醒称为虚假唤醒(spurious wakeups),但此时“测试条件”往往并没有被满足。因此正确的做法是,通过while循环确认等待的“测试条件”是否确己发生并将其作为唤醒后的首个动作来处理,一旦确认是“虚假唤醒”则继续wait等待。而如果使用if语句,则唤醒后无法进行这种确认从而可能导致错误。

      3. pthread_cond_signal/pthread_cond_broadcast放在pthread_mutex_unlock之前还是之后?

      (1) pthread_cond_signal放于pthread_mutex_unlock之前:在某些线程的实现中,可能造成wait线程被(cond_signal)唤醒后,会试图对解锁mutex(pthread_cond_wait内部就是这样实现的)。但此时,由notify线程还释放mutex锁。因此会造在wait线程醒来后又马上挂起等待mutex的释放,造成线程切换的性能浪费。【建议的做法(见第3点的解释)】

      (2) pthread_cond_signal放于pthread_mutex_unlock之后:可以解决前面说的性能损耗。但由于mutex己被释放,如果有个低优先级线程正在等待mutex的话,也可能会出现低优先级的线程抢占高优先级线程执行权的现象。而这种现象又恰好是第1种情况下不会出现的。

      (3)从编程的规则上看,两种情况都可以。但 Linux下线程中,有cond_wait队列和mutex_lock两个队列,cond_signal只是让线程从cond_wait队列移到mutex_lock队列,而不用返回到用户空间,不会有性能的损耗。因此建议采用第1种方案。

    (三)std::condition_variable的使用方法

    //1. wait线程
    std::unique_lock<std::mutex> lock(mutex);  //这里只能使用unique_lock,因为wait内部需要
                                               //unique_lock对象的lock()函数
    cv.wait(lock, [](){return condition == true;}); 
    
    //2. notify线程
    std::lock_guard<std::mutex> lock(mutex); //这里使用lock_guard或unique_lock不受限制,可
                                             //根据需要决定。
    
    //...      //其它操作
    condition = true;
    cv.notify_one();

    【编程实验】利用条件变量实现线程池

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <queue>
    #include <future>
    #include <stdexcept>
    #include <atomic>
    
    using namespace std;
    //线程池,支持的任务可以是变参函数或lambda。可获取返回值
    class ThreadPool
    {
        using Task = std::function<void()>;
    
        std::atomic<bool> m_stop; //是否关闭
        std::atomic<unsigned int>  m_idleThreadNum; //空闲线程的数量
        std::vector<std::thread> m_pool; //线程池
        std::queue <Task> m_tasks; //任务
    
        //同步和互斥
        std::mutex m_mutex;
        std::condition_variable m_cond;
    
        std::thread createThread();
    public:
        ThreadPool(std::size_t threadNums);
        ~ThreadPool();
    
        void stop() { m_stop = true; }
        int idleCount() { return m_idleThreadNum; } //空闲线程数量
    public:
    
        //提交任务,std::future<>用于保存函数的执行结果
        template<class Func, class...Args>
        auto commit(Func&& f, Args... args)->std::future<typename std::result_of<Func(Args...)>::type>;
    };
    
    std::thread ThreadPool::createThread()
    {
        std::thread thread([this] {
            while (!this->m_stop) {   //每个线程的工作就是从队首中不断地取出任务,并执行
                Task task;
                {
                    std::unique_lock<std::mutex> lock(this->m_mutex); //unique_lock比lock_guard更灵活:可随时lock()和unlock() 
                    //等待,直到有任务到来(利用条件变量:线程池退出或任务不为空时被唤醒)
                    m_cond.wait(lock, [this]() {return this->m_stop || !this->m_tasks.empty(); });
    
                    if (this->m_stop) return;
    
                    task = std::move(this->m_tasks.front());
                    this->m_tasks.pop();
                }
                
                --m_idleThreadNum;
    
                task();
    
                ++m_idleThreadNum;
            }
        });
    
        return thread; //RVO优化。不需要std::move(thread),否则不满足优化条件。
    }
    ThreadPool::ThreadPool(std::size_t size):m_stop(false)
    {
        m_idleThreadNum = size < 1 ? 1 : size;
    
        for (std::size_t idx = 0; idx < m_idleThreadNum; ++idx) {
            std::thread thread = createThread();
            m_pool.push_back(std::move(thread));
        }
    }
    
    ThreadPool::~ThreadPool()
    {
        stop();
    
        m_cond.notify_all();
    
        for (auto& th : m_pool) 
            if(th.joinable()) th.join(); // 等待任务结束, 前提:线程一定会执行完
    }
    
    //提交一个任务:std::future.get():用于获取任务的运行结果
    template<class Func, class... Args>
    auto ThreadPool::commit(Func&& f, Args... args)->std::future<typename std::result_of<Func(Args...)>::type>
    {
        if (m_stop) throw std::runtime_error("commit on ThreadPool is stopped.");
    
        using RetType = typename std::result_of<Func(Args...)>::type; //decltype(f(args...));函数返回值类型
    
        //将f函数包装成RetType()类型
        auto task = std::make_shared<std::packaged_task<RetType()>>(
            std::bind(std::forward<Func>(f), std::forward<Args>(args)...) );
        
        std::future<RetType> res = task->get_future();
    
        //添加到任务队列。注意,由于包装类为RetType()类型,需转化为Task类型(void())
        {
            std::unique_lock<std::mutex> lock(this->m_mutex);
            m_tasks.push([task]()->void {(*task)(); }); // 注意task应使用值捕获方式。才能被保存在队列中
        }
    
        m_cond.notify_one(); //唤醒一个线程执行
        
        return res;
    }
    
    class Test
    {
    public:
        int mul(int x, int y) { return x * y; }
    };
    
    int main()
    {
        ThreadPool pool(5);
    
        std::vector<std::future<int>> results;
    
        //任务:lambda表达式
        for (int i = 0; i < 10; ++i) {
            std::future<int> fut = pool.commit([i] {return i * i;});
            results.push_back(std::move(fut));    
        }
    
        //任务:类成员函数
        Test t;
        std::future<int> fut = pool.commit(&Test::mul,&t, 2, 5); //类成员函数
        results.push_back(std::move(fut));
    
        for (auto&& result : results) //auto&&万能引用,会被推导为左值
            std::cout << result.get() << std::endl;
    
        return 0;
    }
  • 相关阅读:
    两条斜线
    Cantor表
    城市网络
    一起来数二叉树吧
    牛客网音乐研究(枚举)
    删括号
    合并回文子串
    寻找道路
    EXTJS 4.0.2 XML数据
    extjs4.0.2a gridpanel看不到横向滚动条的一种原因
  • 原文地址:https://www.cnblogs.com/5iedu/p/11894925.html
Copyright © 2011-2022 走看看