在 C++98 的时代,C++标准并没有包含多线程的支持,人们只能直接调用操作系统提供的 SDK API 来编写多线程程序,不同的操作系统提供的 SDK API 以及线程控制能力不尽相同。到了 C++11,终于在标准之中加入了正式的多线程的支持,从而我们可以使用标准形式的类来创建与执行线程,也使得我们可以使用标准形式的锁、原子操作、线程本地存储 (TLS) 等来进行复杂的各种模式的多线程编程,而且,C++11 还提供了一些高级概念,比如 promise/future,packaged_task,async 等以简化某些模式的多线程编程。
一:thread
头文件<thread>中提供了std::thread 对线程进行封装。C++11 所定义的线程是和操作系的线程是一一对应的,也就是说我们生成的线程都是直接接受操作系统的调度的,通过操作系统的相关命令(比如 ps -M 命令)是可以看到的,一个进程所能创建的线程数目以及一个操作系统所能创建的总的线程数目等都由运行时操作系统限定。
std::thread类如果在构造函数中指明了线程入口函数的话,则从创建好thread对象那一刻起,线程就开始运行了,此时std::thread对象就表示一个正在执行的线程了。但是std::thread创建的线程,线程入口函数的返回值被忽略了,如果线程中抛出了异常,则会直接调用std::terminate结束进程。
std::thread也可以与底层线程不关联,比如使用默认构造函数创建的std::thread对象、被移动了的std::thread对象、detach或join过的std::thread对象。没有两个std::thread对象会表示同一个线程,std::thread不支持复制构造或复制赋值的,只能移动构造或移动赋值。
std::thread有可能抛出std::system_error异常,表示线程无法启动。该异常表示要么是std::errc::resource_unavailable_try_again,要么就是底层实现创建线程时发生了问题。
创建std::thread同时指明线程入口函数的构造函数是:
template< class Function, class... Args > explicit thread( Function&& f, Args&&... args );
调用线程入口函数f时,参数是按值复制或移动传递的。因此如果需要传递一个引用给入口函数的话,需要使用std::ref或std::cref封装。
std::thread中定义了native_handle_type类型,它具体是什么类型取决于底层的线程库。 native_handle_type是连接 std::thread 和操作系统线程库之间的桥梁,在 g++(libstdc++) for Linux 里面,native_handle_type 其实就是 pthread 里面的 pthread_t 类型,当 std::thread 类的功能不能满足我们的要求的时候(比如改变某个线程的优先级),可以通过 std::thread 类实例的 native_handle() 返回值作为参数来调用相关的 pthread 函数达到目的。
std::thread中还定义了内部id类std::thread::id,id对象表示std::thread对象的唯一标识,如果std::thread目前没有关联的线程,则其id值为一个默认值std::thread::id()。std::thread定义了get_id成员函数返回其id值。
std::thread对象如果有关联的活动线程的话,则称其为joinable的,成员函数joinable()可用于查看std::thread是否为joinable的。如果std::thread是joinable的,则其get_id() != std::thread::id()。如果std::thread其底层线程已经执行完,但是尚未被join,则认为其依然是joinable的。下面几种情况,std::thread::joinable会返回false:std::thread是通过默认构造函数创建(未指定线程入口函数);std::thread被移动了(复制或赋值给其他std::thread);std::thread调用了detach之后;std::thread调用了join之后。
std::thread成员函数detach()断开std::thread对象与底层线程的关联,底层线程退出后,其资源自动free掉。如果当前std::thread对象没有关联底层线程,则调用detach会抛出std::system_error异常。调用了detach之后,就不能再该std::thread对象上调用join了;
std::thread成员函数join,能阻塞当前线程的执行,直到执行join的std::thread其底层线程执行完成。针对同一个std::thread对象在多个另外线程中执行join是未定义行为;如果发生错误则会抛出std::system_error异常,比如若std::thread的joinable为false,则是invalid_argument错误;如果线程内部自己调用join,则是resource_deadlock_would_occur错误。
std::thread成员函数swap可以将当前std::thread与其他std::thread对象的底层句柄进行互换。相当于二者互换了身份,各自掌握对方的线程。
如果std::thread执行析构时,其依然关联着底层线程(也就是joinable返回true),则会调用std::terminate。
如果将std::thread对象A移动赋值给对象B,则若B依然关联底层线程(joinable返回true)的话,则会调用std::terminate。移动赋值之后,B就掌握了A的线程,而A就成了默认构造状态。
std::thread还有一个静态成员函数hardware_concurrency,用于返回当前底层实现支持的最大线程并发数,其值只能用于参考。
除了定义std::thread类,<thread>头文件中还定义了命名空间this_thread表示当前线程,并在其中定义了4个辅助函数:yield、get_id、sleep_for、sleep_until。这四个函数一般是在线程执行函数中调用。
std::this_thread::yield用于提示底层实现调度执行其他线程。它的具体实现依赖于底层实现,特别是操作系统当前使用的调度策略。比如对于实时先入先出调度策略(如linux中的SCHED_FIFO)而言,该函数会挂起当前线程,将其插入到相同优先级的就绪队列末尾(如果当前没有相同优先级的其他线程,则yield无效果);
std::this_thread::get_id用户获取当前线程的id;
std::this_thread::sleep_for和std::this_thread::sleep_until用于将当前线程的执行休眠一段时间;
二:锁
头文件<mutex>中定义了std::mutex类,用于表示非递归锁。如果某个线程已经拥有了std::mutex,仍然调用std::mutex::lock的话,是未定义行为;如果std::mutex析构时,仍有任一线程拥有它,则是未定义行为;如果某线程结束时,依然拥有某个std::mutex,则是未定义行为。std::mutex不可复制,不可移动,它只有一个默认构造函数。
std::mutex定义了native_handle_type类型,类似于std::thread::native_handle_type,它取决于具体的底层实现,成员函数std::mutex::native_handle用于返回该锁的底层实现句柄;
std::mutex定义了lock、try_lock和unlock成员函数函数。一般情况下,不直接调用这些函数,而是使用std::unique_lock或std::lock_guard等以RAII的方式管理锁。
<mutex>中还定义了std::timed_mutex,它类似于std::mutex,只不过另外提供了try_lock_for和try_lock_until函数,这俩函数要么在能立即持有锁时返回true,要么最多阻塞一段时间后返回false。
<mutex>中还定义了recursive_mutex、recursive_timed_mutex两种递归锁,以及shared_mutex、shared_timed_mutex两种共享锁(读写锁)。
<mutex>中定义了lock_guard,用于对锁进行RAII式的封装,创建lock_guard时会持有锁,析构lock_guard时会释放锁。lock_guard不可复制。
int g_i = 0; std::mutex g_i_mutex; // protects g_i void safe_increment() { std::lock_guard<std::mutex> lock(g_i_mutex); ++g_i; std::cout << std::this_thread::get_id() << ": " << g_i << ' '; // g_i_mutex is automatically released when lock goes out of scope } std::thread t1(safe_increment); std::thread t2(safe_increment); t1.join(); t2.join();
<mutex>中还提供了defer_lock_t、try_to_lock_t、adopt_lock_t以及unique_lock、shared_lock,结合std::lock或std::try_lock函数,可以方便的对上面的锁进行封装,最常见的就是封装多个锁,以避免死锁的发生。具体可参考en.cppreference.com中的例子。
<mutex>中还提供了std::call_once函数,保证某个函数即使在多个线程中同时调用时,也只被调用一次。
template< class Callable, class... Args > void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
如果调用call_once时flag已经被设置,说明函数f已经被调用过了,这种情况下call_once直接返回;如果flag未被设置,则调用call_once时会直接调用std::forward<Callable>(f),并向其传递std::forward<Args>(args)...参数。如果此时f内抛出了异常,则异常会传递给call_once的调用者,并且不会设置flag,这样可以使得后续使用同一标志调用call_once时能继续调用f函数。
std::once_flag flag1, flag2; void simple_do_once() { std::call_once(flag1, [](){ std::cout << "Simple example: called once "; }); } void may_throw_function(bool do_throw) { if (do_throw) { std::cout << "throw: call_once will retry "; // this may appear more than once throw std::exception(); } std::cout << "Didn't throw, call_once will not attempt again "; // guaranteed once } void do_once(bool do_throw) { try { std::call_once(flag2, may_throw_function, do_throw); } catch (...) { } } std::thread st1(simple_do_once); std::thread st2(simple_do_once); std::thread st3(simple_do_once); std::thread st4(simple_do_once); st1.join(); st2.join(); st3.join(); st4.join(); std::thread t1(do_once, true); std::thread t2(do_once, true); std::thread t3(do_once, false); std::thread t4(do_once, true); t1.join(); t2.join(); t3.join(); t4.join();
上面代码的结果是:
Simple example: called once throw: call_once will retry throw: call_once will retry Didn't throw, call_once will not attempt again
三:条件变量
<condition_variable>头文件中提供了condition_variable类以对条件变量进行支持。条件变量也是一种同步原语,它可以使多个线程阻塞,直到另一个线程修改了某共享变量并且对条件变量进行通知之后,才解除阻塞。
修改共享变量的线程需要:持有某种锁(一般是通过std::lock_guard),在持有锁的情况下修改变量,在std::condition_variable上执行notify_one或notify_all(执行通知时不需要持有锁)。即使共享变量是原子的,也需要在持有锁的情况下进行修改,以便能够正确的通知到等待条件变量的线程。
等待条件变量std::condition_variable的线程需要:持有一个std::unique_lock<std::mutex>(该锁也是修改共享变量线程需要持有的锁);执行wait、wait_for或wait_until,这些等待操作会原子的释放锁并挂起线程;当条件变量得到通知时,或超时时间到时,或者发生虚假唤醒时,线程醒来并且原子性的获取锁。此时线程应该检查条件,如果条件未满足(虚假唤醒、超时时间到时)继续等待。
std::condition_variable只能与std::unique_lock<std::mutex>一起使用,这种约束使得在一些平台上能够获得最大效率。<condition_variable>中提供了std::condition_variable_any条件变量,可以与任意类型的锁(如std::shared_lock)一起工作。
std::condition_variable不能进行复制构造、移动构造,也不能复制赋值或移动赋值。
类似于std::thread和std::mutex,std::condition_variable也提供了native_handle_type类型和native_handle函数,用于返回条件变量底层实现的句柄。
std::condition_variable的wait成员函数:
void wait( std::unique_lock<std::mutex>& lock ); template< class Predicate > void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
wait会阻塞当前线程,直到其他线程在相同条件变量上调用了nofify_one或notify_all,或者直到发生了虚假唤醒。调用wait之前需要先锁住lock,调用wait会原子的释放lock,阻塞当前线程,将当前线程添加到等待条件变量(*this)的线程列表中。当在条件变量上调用nofify_one或notify_all,或是发生虚假唤醒时,当前线程解除阻塞,并且再次锁住lock。
第二个重载实际上相当于:
while (!pred()) { wait(lock); }
注意lock只能是std::unique_lock<std::mutex>。如果lock没有锁住就调用wait,这是未定义行为;如果锁住的lock与其他等待相同条件变量的线程使用lock不是同一个lock,这也是未定义行为。如果在notify_one之后才调用wait,则该wait不会唤醒;
std::condition_variable cv; std::mutex cv_m; // This mutex is used for three purposes: // 1) to synchronize accesses to i // 2) to synchronize accesses to std::cerr // 3) for the condition variable cv int i = 0; void waits() { std::unique_lock<std::mutex> lk(cv_m); std::cerr << "Waiting... "; cv.wait(lk, []{return i == 1;}); std::cerr << "...finished waiting. i == 1 "; } void signals() { std::this_thread::sleep_for(std::chrono::seconds(1)); { std::lock_guard<std::mutex> lk(cv_m); std::cerr << "Notifying... "; } cv.notify_all(); std::this_thread::sleep_for(std::chrono::seconds(1)); { std::lock_guard<std::mutex> lk(cv_m); i = 1; std::cerr << "Notifying again... "; } cv.notify_all(); } int main() { std::thread t1(waits), t2(waits), t3(waits), t4(signals); t1.join(); t2.join(); t3.join(); t4.join(); }
结果是:
Waiting... Waiting... Waiting... Notifying... Notifying again... ...finished waiting. i == 1 ...finished waiting. i == 1 ...finished waiting. i == 1
wait_for和wait_until类似于wait,只不过它们能指定等待的时间。
如果当前有任一线程等待条件变量,则notify_one函数会唤醒其中的一个。注意,调用notify_one的线程没必要锁住等待条件变量线程使用的lock,实际上这么做有时候是有害无益的,因为这会导致被唤醒的线程重新阻塞,直到发起notify的线程释放该lock。不过在需要精确调度事件的场景中,这样做也是有必要的。比如如果等待线程在条件满足之后会结束进程,这就会导致条件变量被析构,如果在解锁之后notify之前发生了虚假唤醒,将会导致在已析构的条件变量上调用notify。
notify_all将会唤醒所有等待在条件变量上的线程。
std::condition_variable只能和std::unique_lock<std::mutex>一起工作,如果需要能够使用其他锁,则可以使用std::condition_variable_any。该类是不可复制,也不可移动的。如果std::condition_variable_any也使用std::unique_lock<std::mutex>的话,其效率没有std::condition_variable高。
<condition_variable>中还提供了std::notify_all_at_thread_exit函数:
void notify_all_at_thread_exit( std::condition_variable& cond, std::unique_lock<std::mutex> lk );
该函数用于通知其他线程当前线程已经结束。该函数的操作是:将已经获取的锁lk的拥有权转移到内部存储,然后当前线程退出时,调用条件变量的notify_all:
lk.unlock();
cond.notify_all();
lk会一直锁住,直到线程退出。该函数一般用于detached线程退出时调用。
四:Futures
std::thread有个缺点,就是无法获取调用函数的返回值或者捕获其抛出的异常。因此,<future>提供了一系列用于获取异步线程的返回值或者其抛出的异常的机制。这些返回值通过共享状态(shared state)进行通信,线程将返回值或抛出的异常存储在共享状态中,然后其他线程通过操作与这些共享状态关联的std::future或std::shared_future来获取内部的值。
std::promise提供了这样一种机制:它关联一个共享状态(shared state),该共享状态中保存了一些状态信息,并用于存储值或异常。该值或异常之后的某个时刻会被std::promise创建的std::future对象异步的取出。std::promise的使用是一次性的,而且它不支持复制,只支持移动,当对象A移动构造/赋值给对象B后,A就没有关联的共享状态了。
std::promise对共享状态可以做三件事:
标记为ready,std::promise将值或异常存储到共享状态中,将状态标记为ready。这样就解除了其他线程在关联该共享状态上的期值上的阻塞;
release,std::promise放弃与共享状态的关联。如果当前是共享状态最后的引用,则共享状态被销毁。如果该共享状态是std::async创建的,并且尚未ready,则该动作会阻塞;
abandon,std::promise将以std::future_errc::broken_promise为错误码的std::future_error的异常存储在共享状态中,然后将共享状态标记为ready后release。可以这样理解,promise意思是许诺,许诺将来会把值存储在共享状态中,现在它放弃了这个诺言,因而是broken_promise。
如果当前关联的共享状态是ready的,std::promise的析构函数会release共享状态;如果共享状态不是ready的,则析构函数以std::future_errc::broken_promise为错误码将异常std::future_error存储到共享状态中,并将其标记为ready后release。
std::promise的set_value是原子的将值存储到共享状态中,并且将其置为ready;set_exception用于在共享状态中存储异常;而set_value_at_thread_exit将值存储到共享状态后并不会立即将其置为ready,而是在线程退出时才将其置为ready;set_exception_at_thread_exit的行为类似。set_value、set_exception、set_value_at_thread_exit或set_exception_at_thread_exit的行为,就好像他们在更新promise状态时获取一个锁。如果当前没有关联的共享状态,或者共享状态中已经存储了值或异常,则这些操作就会抛出异常。
std::promise处于promise-future通信信道的写端,在共享状态中写入了值之后,任何在该共享状态上等待的操作(比如std::future::get)就会成功返回。std::promise的get_future成员函数,返回一个关联相同共享状态的std::future对象。如果当前没有关联的共享状态,或是get_future已经被调用过一次了,则会抛出异常。如果需要在promise-future传输信道上有多个读端,则可以使用std::future::share。调用get_future并不会和set_value、set_exception、set_value_at_thread_exit或set_exception_at_thread_exit产生data race。
std::future用于访问异步操作的结果,异步操作(通过std::async,std::package_task或std::promise创建)可以提供给异步操作的创建者一个std::future,然后创建者使用std::future的各种方法访问、等待,或从std::future中取值。当异步操作尚未提供一个值时,这些操作可能会导致阻塞。
std::future的valid成员函数可以判断该std::future对象是否关联了共享状态。默认构造的std::future就没有关联的共享状态,因而其valid返回false;std::future不支持复制(构造或赋值),只支持移动(构造或赋值)。当移动A到B时,A的valid就会返回false;
std::future的析构函数会release共享状态,如果当前是关联到共享状态最后一个引用,则该共享状态被销毁;std::future断开与共享状态的关联。如果当前std::future由std::async返回,且共享状态尚未ready,且当前std::future是该共享状态的最后引用,则析构时会阻塞。
std::future的get操作将阻塞到future能够从共享状态中取到值。如果当前未关联共享状态,则调用get是未定义行为;调用get后,共享状态被release了,且valid会返回false。如果是常规的future,如std::future<int>,则get从共享状态中取到的值,相当于执行了std::move操作;如果模板实参为T&,如std::future<int&>,则get返回的是共享状态中保存的值的引用;如果共享状态中保存的是异常的话,则调用get将会抛出该异常;
std::future的wait,wait_for和wait_until用于从共享状态中等待其变为ready;
std::future的share操作将共享状态转移给std::shared_future,调用完share之后,当前future的valid返回false。多个std::shared_future可以关联同一个共享状态,std::shared_future支持复制,多线程通过自己的shared_future副本同时访问同一个共享状态是安全的。
<future>提供的std::packaged_task,可以封装任意可调用对象,而且其本身也具有operator()成员函数,所以它可以作为std::thread的调用对象,与普通可调用对象不同的是,std::packaged_task关联了共享状态,它的返回值或者抛出的异常可以存储到该共享状态中。该值或异常就可以通过get_future函数(get_future只能调用一次)返回的std::future进行访问。
std::packaged_task不可复制,只能移动;类似于std::promise,如果共享状态在ready之前,std::packaged_task被析构了,则该共享状态中会存储一个以std::future_errc::broken_promise为错误码的std::future_error异常。
<future>提供的std::async模板函数可以异步的调用f,返回一个std::future,用于获取f的返回值,或者其抛出的异常。std::async调用f的方式有两种,一种是std::launch::async,表示启动一个新线程执行f;另一种是std::launch::deferred,表示在获取f返回值(通过future)时,才同步的执行f(延迟计算)。
https://en.cppreference.com/w/cpp/thread
https://www.ibm.com/developerworks/cn/linux/1412_zhupx_thread/index.html