一. 条件变量
(一)条件变量概述
多线程访问一个共享资源(或称临界区),不仅需要用互斥锁实现独享访问避免并发错误,在获得互斥锁进入临界区后,还需检查特定条件是否成立。当某个线程修改测试条件后,将通知其它正在等待条件的线程继续往下执行。
1. wait线程:如果不满足该条件,拥有条件变量的wait线程会先释放该互斥锁,并把自身放入条件变量内部的等待队列,阻塞等待条件成立。
2. notify线程:在wait线程阻塞期间,notify线程获取互斥锁并进入临界区内访问共享资源,然后改变测试条件,当条件满足时通知在条件变量上等待的wait线程。wait线程重新申请对该互斥锁加锁,确认条件成立后则进行后续的任务处理,否则继续等待。
(二)std::condition_variable
class condition_variable { // class for waiting for conditions public: using native_handle_type = _Cnd_t; condition_variable() { // 默认构造函数 _Cnd_init_in_situ(_Mycnd()); } ~condition_variable() noexcept { // 析构函数 _Cnd_destroy_in_situ(_Mycnd()); } //不可复制和移动(如果重写移动,因此也就不可移动) condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete; void notify_one() noexcept { // 唤醒一个等待线程 _Check_C_return(_Cnd_signal(_Mycnd())); } void notify_all() noexcept { // 唤醒所有的等待线程 _Check_C_return(_Cnd_broadcast(_Mycnd())); } void wait(unique_lock<mutex>& _Lck) { // 等待,直到被唤醒 // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow _Check_C_return(_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx())); } template <class _Predicate> void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // 等待信号并测试条件 while (!_Pred()) { //判断测试条件,只有当Pred不成立时才阻塞 wait(_Lck); } } //等待,直到被唤醒或者超时 template <class _Rep, class _Period> cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) { if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) { return cv_status::timeout; } // The standard says that we should use a steady clock, but unfortunately our ABI // speaks struct xtime, which is relative to the system clock. _CSTD xtime _Tgt; const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Rel_time); const cv_status _Result = wait_until(_Lck, &_Tgt); if (_Clamped) { return cv_status::no_timeout; } return _Result; } //带超时等待并检测条件 template <class _Rep, class _Period, class _Predicate> bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) { // wait for signal with timeout and check predicate return _Wait_until1(_Lck, chrono::steady_clock::now() + _Rel_time, _Pred); } //等待,直到被唤醒或者到达某个时间点 template <class _Clock, class _Duration> cv_status wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) { // 等待到指定的时间点 for (;;) { const auto _Now = _Clock::now(); if (_Abs_time <= _Now) { return cv_status::timeout; } _CSTD xtime _Tgt; (void) _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now); const cv_status _Result = wait_until(_Lck, &_Tgt); if (_Result == cv_status::no_timeout) { return cv_status::no_timeout; } } } //阻塞等待,直到信号或指定时长到达,同时检测条件 template <class _Clock, class _Duration, class _Predicate> bool wait_until( unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) { //等待带有超时的信号并检测条件 return _Wait_until1(_Lck, _Abs_time, _Pred); } cv_status wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time) { // wait for signal with timeout if (!_Mtx_current_owns(_Lck.mutex()->_Mymtx())) { _Throw_Cpp_error(_OPERATION_NOT_PERMITTED); } // Nothing to do to comply with LWG 2135 because std::mutex lock/unlock are nothrow const int _Res = _Cnd_timedwait(_Mycnd(), _Lck.mutex()->_Mymtx(), _Abs_time); switch (_Res) { case _Thrd_success: return cv_status::no_timeout; case _Thrd_timedout: return cv_status::timeout; default: _Throw_C_error(_Res); } } template <class _Predicate> bool wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate _Pred) { // wait for signal with timeout and check predicate return _Wait_until1(_Lck, _Abs_time, _Pred); } //返回原生句柄 _NODISCARD native_handle_type native_handle() { // return condition variable handle return _Mycnd(); } void _Register(unique_lock<mutex>& _Lck, int* _Ready) { // register this object for release at thread exit _Cnd_register_at_thread_exit(_Mycnd(), _Lck.release()->_Mymtx(), _Ready); } void _Unregister(mutex& _Mtx) { // unregister this object for release at thread exit _Cnd_unregister_at_thread_exit(_Mtx._Mymtx()); } private: aligned_storage_t<_Cnd_internal_imp_size, _Cnd_internal_imp_alignment> _Cnd_storage; _Cnd_t _Mycnd() noexcept { // get pointer to _Cnd_internal_imp_t inside _Cnd_storage return reinterpret_cast<_Cnd_t>(&_Cnd_storage); } template <class _Predicate> bool _Wait_until1(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate& _Pred) { // wait for signal with timeout and check predicate while (!_Pred()) { if (wait_until(_Lck, _Abs_time) == cv_status::timeout) { return _Pred(); } } return true; } template <class _Clock, class _Duration, class _Predicate> bool _Wait_until1( unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate& _Pred) { while (!_Pred()) { const auto _Now = _Clock::now(); if (_Abs_time <= _Now) { return false; } _CSTD xtime _Tgt; const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now); if (wait_until(_Lck, &_Tgt) == cv_status::timeout && !_Clamped) { return _Pred(); } } return true; } };
1. condition_variable和condition_variable_any类相似,前者只能使用unique_lock<mutex>来锁定线程,因为在condition_variable的内部将通过调用unique_lock的lock()函数来获取mutex,而诸如lock_guard对象并不提供lock()操作。后者可以使用任意类型的可锁定对象。
2. wait(lck)用于等待通知。而wait(lck,pred)会先判断pred条件,再决定是否阻塞等待,当pred为false则继续等待,否则直接返回,相当于 while(!pred())wait(lck)。
3. notify_one()用于通知一个等待线程,而notify_all()用于通知所有等待线程。
4. 对象析构前,所有阻塞在该条件变量上的线程均应被notified,否则将产生不可预知行为。
5. 条件变量不支持拷贝和移动操作。
【编程实验】利用条件变量实现线程安全队列
#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <memory> #include <queue> using namespace std; std::mutex io_mutex; //线程安全队列 template<typename T> class threadsafe_queue { mutable std::mutex m_mutex; //互斥量必须是mutable的!for empty()和拷贝构造函数 std::queue<T> m_queue; std::condition_variable m_cond; public: threadsafe_queue() {} threadsafe_queue(const threadsafe_queue& other) { std::lock_guard<std::mutex> lock(other.m_mutex); //other是const对象 //因lock里面的m_mutex。因此要求 //m_mutex必须是mutable。 m_queue = other.m_queue; } void push(T value) { std::lock_guard<std::mutex> lock(m_mutex); m_queue.push(value); m_cond.notify_one(); } //尝试从队列中取出元素(非阻塞) bool try_pop(T& value) { std::lock_guard<std::mutex> lock(m_mutex); if (m_queue.empty()) return false; value = m_queue.front(); m_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lock(m_mutex); if (m_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(m_queue.front())); m_queue.pop(); return res; } //等待,直到队列有元素就取出并返回 void wait_and_pop(T& value) { std::unique_lock<std::mutex> lock(m_mutex); m_cond.wait(lock, [this] {return !m_queue.empty(); }); value = m_queue.front(); m_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lock(m_mutex); m_cond.wait(lock, [this] {return !m_queue.empty(); }); std::shared_ptr<T> res(std::make_shared<T>(m_queue.front())); m_queue.pop(); return res; } //由于empty()是个const成员函数。由于lock操作,这就要求mutex是可变的 //因此,m_mutex声明为mutable bool empty() const { std::lock_guard<std::mutex> lock(m_mutex); return m_queue.empty(); } }; void producer(threadsafe_queue<int>& queue, int tag) { for (int i = 0; i < 100; ++i) { queue.push((i + 1) * tag); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } void comsumer(threadsafe_queue<int>& queue) { int elem; while (!queue.empty()) { queue.wait_and_pop(elem); { std::lock_guard<std::mutex> lock(io_mutex); cout << "thread(id=" << std::this_thread::get_id() << ") get element: " << elem << endl; } std::this_thread::sleep_for(std::chrono::milliseconds(40)); } } int main() { threadsafe_queue<int> queue; //3个生产者和2个消费者 std::thread t1(producer, std::ref(queue), 1); std::thread t2(producer, std::ref(queue), 2); std::thread t3(producer, std::ref(queue), 3); std::thread t4(comsumer, std::ref(queue)); std::thread t5(comsumer, std::ref(queue)); t1.join(); t2.join(); t3.join(); t4.join(); t5.join(); return 0; }
二. 底层wait函数的行为剖析(以pthread_cond_wait为例)
(一)pthread_cond_wait的执行流程
1. 调用线程将自己放入等待队列,mutex解锁。(调用线程己加入等待队列并解锁,此时,允许其他线程改变“测试条件”)
2. 挂起,等待pthread_cond_signal或pthread_cond_broadcast去唤醒。(其他线程改变测试条件,当条件满足时会发出通知)
3. 被唤醒,mutex加锁。
(二)关于条件变量的几个问题
1. 为什么在pthread_cond_wait之前需要加锁?
mutex是用来保护“测试条件”的,调用者将mutex传递给pthread_cond_wait,该函数内部会自动将调用线程放到等待队列中,然后再解锁mutex,并等待“测试条件”成立。这种做法关闭了从我们检测“测试条件”的时刻到将线程放入到等待队列之间的这段“时间窗口”,使得“测试条件”在线程加入等待队列之前不会被其他线程修改,从而确保调用线程不会错过“测试条件”的改变。最后,当pthread_cond_wait返回前,mutex又被上锁了。
2. 为什么使用while语句来循环判断“测试条件”而不使用if语句?
线程API存在一个事实(很多语言中都如此,不仅仅是C++),就是即使在没有通知条件变量的情况下线程也可能被唤醒,这样的唤醒称为虚假唤醒(spurious wakeups),但此时“测试条件”往往并没有被满足。因此正确的做法是,通过while循环确认等待的“测试条件”是否确己发生并将其作为唤醒后的首个动作来处理,一旦确认是“虚假唤醒”则继续wait等待。而如果使用if语句,则唤醒后无法进行这种确认从而可能导致错误。
3. pthread_cond_signal/pthread_cond_broadcast放在pthread_mutex_unlock之前还是之后?
(1) pthread_cond_signal放于pthread_mutex_unlock之前:在某些线程的实现中,可能造成wait线程被(cond_signal)唤醒后,会试图对解锁mutex(pthread_cond_wait内部就是这样实现的)。但此时,由notify线程还释放mutex锁。因此会造在wait线程醒来后又马上挂起等待mutex的释放,造成线程切换的性能浪费。【建议的做法(见第3点的解释)】
(2) pthread_cond_signal放于pthread_mutex_unlock之后:可以解决前面说的性能损耗。但由于mutex己被释放,如果有个低优先级线程正在等待mutex的话,也可能会出现低优先级的线程抢占高优先级线程执行权的现象。而这种现象又恰好是第1种情况下不会出现的。
(3)从编程的规则上看,两种情况都可以。但 Linux下线程中,有cond_wait队列和mutex_lock两个队列,cond_signal只是让线程从cond_wait队列移到mutex_lock队列,而不用返回到用户空间,不会有性能的损耗。因此建议采用第1种方案。
(三)std::condition_variable的使用方法
//1. wait线程 std::unique_lock<std::mutex> lock(mutex); //这里只能使用unique_lock,因为wait内部需要 //unique_lock对象的lock()函数 cv.wait(lock, [](){return condition == true;}); //2. notify线程 std::lock_guard<std::mutex> lock(mutex); //这里使用lock_guard或unique_lock不受限制,可 //根据需要决定。 //... //其它操作 condition = true; cv.notify_one();
【编程实验】利用条件变量实现线程池
#include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <future> #include <stdexcept> #include <atomic> using namespace std; //线程池,支持的任务可以是变参函数或lambda。可获取返回值 class ThreadPool { using Task = std::function<void()>; std::atomic<bool> m_stop; //是否关闭 std::atomic<unsigned int> m_idleThreadNum; //空闲线程的数量 std::vector<std::thread> m_pool; //线程池 std::queue <Task> m_tasks; //任务 //同步和互斥 std::mutex m_mutex; std::condition_variable m_cond; std::thread createThread(); public: ThreadPool(std::size_t threadNums); ~ThreadPool(); void stop() { m_stop = true; } int idleCount() { return m_idleThreadNum; } //空闲线程数量 public: //提交任务,std::future<>用于保存函数的执行结果 template<class Func, class...Args> auto commit(Func&& f, Args... args)->std::future<typename std::result_of<Func(Args...)>::type>; }; std::thread ThreadPool::createThread() { std::thread thread([this] { while (!this->m_stop) { //每个线程的工作就是从队首中不断地取出任务,并执行 Task task; { std::unique_lock<std::mutex> lock(this->m_mutex); //unique_lock比lock_guard更灵活:可随时lock()和unlock() //等待,直到有任务到来(利用条件变量:线程池退出或任务不为空时被唤醒) m_cond.wait(lock, [this]() {return this->m_stop || !this->m_tasks.empty(); }); if (this->m_stop) return; task = std::move(this->m_tasks.front()); this->m_tasks.pop(); } --m_idleThreadNum; task(); ++m_idleThreadNum; } }); return thread; //RVO优化。不需要std::move(thread),否则不满足优化条件。 } ThreadPool::ThreadPool(std::size_t size):m_stop(false) { m_idleThreadNum = size < 1 ? 1 : size; for (std::size_t idx = 0; idx < m_idleThreadNum; ++idx) { std::thread thread = createThread(); m_pool.push_back(std::move(thread)); } } ThreadPool::~ThreadPool() { stop(); m_cond.notify_all(); for (auto& th : m_pool) if(th.joinable()) th.join(); // 等待任务结束, 前提:线程一定会执行完 } //提交一个任务:std::future.get():用于获取任务的运行结果 template<class Func, class... Args> auto ThreadPool::commit(Func&& f, Args... args)->std::future<typename std::result_of<Func(Args...)>::type> { if (m_stop) throw std::runtime_error("commit on ThreadPool is stopped."); using RetType = typename std::result_of<Func(Args...)>::type; //decltype(f(args...));函数返回值类型 //将f函数包装成RetType()类型 auto task = std::make_shared<std::packaged_task<RetType()>>( std::bind(std::forward<Func>(f), std::forward<Args>(args)...) ); std::future<RetType> res = task->get_future(); //添加到任务队列。注意,由于包装类为RetType()类型,需转化为Task类型(void()) { std::unique_lock<std::mutex> lock(this->m_mutex); m_tasks.push([task]()->void {(*task)(); }); // 注意task应使用值捕获方式。才能被保存在队列中 } m_cond.notify_one(); //唤醒一个线程执行 return res; } class Test { public: int mul(int x, int y) { return x * y; } }; int main() { ThreadPool pool(5); std::vector<std::future<int>> results; //任务:lambda表达式 for (int i = 0; i < 10; ++i) { std::future<int> fut = pool.commit([i] {return i * i;}); results.push_back(std::move(fut)); } //任务:类成员函数 Test t; std::future<int> fut = pool.commit(&Test::mul,&t, 2, 5); //类成员函数 results.push_back(std::move(fut)); for (auto&& result : results) //auto&&万能引用,会被推导为左值 std::cout << result.get() << std::endl; return 0; }