线程
类std::thread代表一个可执行线程,使用时必须包含头文件<thread>。std::thread可以和普通函数,匿名函数和仿函数(一个实现了operator()函数的类)一同使用。另外,它允许向线程函数传递任意数量的参数。
#include <thread> void func() { // do some work } int main() { std::thread t(func); t.join(); return 0; }
上例中,t 是一个线程对象,函数func()运行于该线程中。对join()函数的调用将使调用线程(本例是指主线程)一直处于阻塞状态,直到正在执行的线程t执行结束。如果线程函数返回某个值,该值也将被忽略。不过,该函数可以接收任意数量的参数。
void func(int i, double d, const std::string& s) { std::cout << i << ", " << d << ", " << s << std::endl; } int main() { std::thread t(func, 1, 12.50, "sample"); t.join(); return 0; }
尽管可以向线程函数传递任意数量的参数,但是所有的参数应当按值传递。如果需要将参数按引用传递,那要向下例所示那样,必须将参数用std::ref 或者std::cref进行封装。
void func(int& a) { a++; } int main() { int a = 42; std::thread t(func, std::ref(a)); t.join(); std::cout << a << std::endl; return 0; }
该程序打印结果为43,但是如果不用std::ref把参数a进行封装的话,输出结果将为42.
除了join方法外,该线程类还提供了另外两个方法:
swap:交换两个线程对象的底层句柄。
Detach: 允许执行该方法的线程脱离其线程对象而继续独立执行。脱离后的线程不再是可结合线程(你不能等待它们执行结束)。
int main() { std::thread t(funct); t.detach(); return 0; }
有一点非常重要,如果线程函数抛出异常,使用常规的try-catch语句是捕获不到该异常的。换句话说,以下的做法是不可行的:
try { std::thread t1(func); std::thread t2(func); t1.join(); t2.join(); } catch(const std::exception& ex) { std::cout << ex.what() << std::endl; }
要在线程间传递异常,你需要在线程函数中捕获他们,将其存储在合适的地方,比便于另外的线程可以随后获取到这些异常。
std::mutex g_mutex; std::vector<std::exception_ptr> g_exceptions; void throw_function() { throw std::exception("something wrong happened"); } void func() { try { throw_function(); } catch(...) { std::lock_guard<std::mutex> lock(g_mutex); g_exceptions.push_back(std::current_exception()); } } int main() { g_exceptions.clear(); std::thread t(func); t.join(); for(auto& e : g_exceptions) { try { if(e != nullptr) { std::rethrow_exception(e); } } catch(const std::exception& e) { std::cout << e.what() << std::endl; } } return 0; }
想要知道更多的关于捕获和传递异常的知识,可以阅读这两本书在主线程中处理辅助线程抛出的C++异常和怎样在线程间传递异常。
在深入学习之前,有一点需要注意 <thread>头文件在命名空间std::this_thread中提供了一些帮助函数:
- get_id: 返回当前线程的id.
- yield:在处于等待状态时,可以让调度器先运行其他可用的线程。
- sleep_for:阻塞当前线程,时间不少于其参数指定的时间。
- sleep_util:在参数指定的时间到达之前,使当前线程一直处于阻塞状态。
锁
在上面的例子中,我需要对vector g_exceptions进行同步访问,以确保在同一时间只能有一个线程向其中添加新元素。为此,我使用了互斥量,并对该互斥进行加锁。互斥量是一个核心同步原语,C++ 11的<mutex>头文件里包含了四种不同的互斥量。
- Mutex: 提供了核心函数 lock() 和 unlock(),以及非阻塞方法的try_lock()方法,一旦互斥量不可用,该方法会立即返回。
- Recursive_mutex:允许在同一个线程中对一个互斥量的多次请求。
- Timed_mutex:同上面的mutex类似,但它还有另外两个方法 try_lock_for() 和 try_lock_until(),分别用于在某个时间段里或者某个时刻到达之间获取该互斥量。
- Recursive_timed_mutex: 结合了timed_mutex 和recuseive_mutex的使用。
下面是一个使用了std::mutex的例子(注意前面提到过的帮助函数get_id()和sleep_for()的用法)。
#include <iostream> #include <thread> #include <mutex> #include <chrono> std::mutex g_lock; void func() { g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(rand() % 10)); std::cout << "leaving thread " << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { srand((unsigned int)time(0)); std::thread t1(func); std::thread t2(func); std::thread t3(func); t1.join(); t2.join(); t3.join(); return 0; }
输出结果如下所示:
entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424
lock()和unlock()这两个方法应该一目了然,第一个方法用来对互斥量加锁,如果互斥量不可用,便处于阻塞状态。后者则用来对互斥量解锁。
下面这个例子展示了一个简单的线程安全容器(内部使用std::vector).这个容器带有添加单个元素的add()方法和添加多个元素的addrange()方法,addrange()方法内部仅仅调用了add()方法。
注意:就像下面的评论里所指出的一样,由于某些原因,包括使用了va_args,这不是一个标准的线程安全容器。而且,dump()方法也不是容器的方法,从真正的实现上来说,它只是一个帮助(独立的)函数。这个例子仅仅用来告诉大家一些有关互斥量的概念,而不是实现一个完全成熟的,无任何错误的线程安全容器。
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container<int>& cont) { cont.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cont; std::thread t1(func, std::ref(cont)); std::thread t2(func, std::ref(cont)); std::thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; }
运行该程序时,会进入死锁状态。原因是该容器试图多次去获取同一个互斥量,却一直没有释放它,这样是不可行的。
在这里,使用std::recursive_mutex就可以很好地解决这个问题,它允许同一个线程多次获取同一个互斥量,可获取的互斥量的最大次数并没有具体说明。但是一旦超过最大次数,再对lock进行调用就会抛出std::system_error错误异常。
要想修改上述代码中的问题(除了修改addrange()方法的实现,使它不去调用lock()和unlock()),还可以将互斥量std::mutex改为std::recursive_mutex
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container<int>& cont) { cont.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cont; std::thread t1(func, std::ref(cont)); std::thread t2(func, std::ref(cont)); std::thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; }
修改后,就会得到下面的输出结果。
6334 18467 41 6334 18467 41 6334 18467 41
聪明的读者会注意到每次调用func()都会产生相同的数字序列。这是因为种子数是线程本地化的,仅仅在主线程中调用了srand()对种子进行了初始化,在其他工作线程中并没用进行初始化,所以每次都得到相同的数字序列。
显式的加锁和解锁会导致一些问题,比如忘记解锁或者请求加锁的顺序不正确,进而产生死锁。该标准提供了一些类和函数帮助解决此类问题。这些封装类保证了在RAII风格上互斥量使用的一致性,可以在给定的代码范围内自动加锁和解锁。封装类包括:
Lock_guard:在构造对象时,它试图去获取互斥量的所有权(通过调用lock()),在析构对象时,自动释放互斥量(通过调用unlock()).这是一个不可复制的类。
Unique_lock:这个一通用的互斥量封装类,不同于lock_guard,它还支持延迟加锁,时间加锁和递归加锁以及锁所有权的转移和条件变量的使用。这也是一个不可复制的类,但它是可移动类。
有了这些封装类,我们可以像下面这样改写容器类:
template <typename T> class container { std::recursive_mutex _lock; std::vector<T> _elements; public: void add(T element) { std::lock_guard<std::recursive_mutex> locker(_lock); _elements.push_back(element); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { std::lock_guard<std::recursive_mutex> locker(_lock); add(va_arg(arguments, T)); } va_end(arguments); } void dump() { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
有人也许会问,既然dump()方法并没有对容器的状态做任何修改,是不是应该定义为const方法呢?但是你如果将它定义为const,编译器会报出下面的错误:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)’ : cannot convert parameter 1 from ‘const std::recursive_mutex’ to ‘std::recursive_mutex &’
一个互斥量(不管使用的哪一种实现)必须要获取和释放,这就意味着要调用非const的lock()和unlock()方法。所以从逻辑上来讲,lock_guard的参数不能使const(因为如果该方法为const,互斥量也必需是const).解决这个问题的办法就是将互斥量定义为可变的mutable,Mutable允许在常函数中修改状态。
不过,这种方法只能用于隐藏或者元状态(就像对计算结果或查询的数据进行缓存,以便下次调用时可以直接使用,不需要进行多次计算和查询。再或者,对在一个对象的实际状态起辅助作用的互斥量进行位的修改)。
template <typename T> class container { mutable std::recursive_mutex _lock; std::vector<T> _elements; public: void dump() const { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
这些封装类的构造函数可以重载,接受一个参数用来指明加锁策略。可用的策略如下:
- defer_lock of type defer_lock_t:不获取互斥量的拥有权
- try_to_lock of type try_to_lock_t:在不阻塞的情况下试图获取互斥量的拥有权
- adopte_lock of type adopt_lock_t:假设调用线程已经拥有互斥量的所有权
这些策略的声明如下:
struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { }; constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
除了这些互斥量的封装类,该标准还提供了两个方法,用于对一个或多个互斥量进行加锁。
- lock:使用一种可以避免死锁的算法对互斥量加锁(通过调用lock(),try_lock()和unlock()).
- try_lock():按照互斥量被指定的顺序,试着通过调用try_lock()来对多个互斥量加锁。
这是一个发生死锁的例子:有一个用来存储元素的容器和一个函数exchange(),该函数用来交换两个容器中的元素。要成为线程安全函数,该函数通过获取每个容器的互斥量,来对两个容器的访问进行同步操作。
template <typename T> class container { public: std::mutex _lock; std::set<T> _elements; void add(T element) { _elements.insert(element); } void remove(T element) { _elements.erase(element); } }; void exchange(container<int>& cont1, container<int>& cont2, int value) { cont1._lock.lock(); std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock cont2._lock.lock(); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
假设这个函数是由两个不同的线程进行调用的,第一个线程中,一个元素从容器1中移除,添加到容器2中。第二个线程中,该元素又从容器2移除添加到容器1中。这种做法会导致发生死锁(如果在获取第一个锁后,线程上下文刚好从一个线程切换到另一个线程,导致发生死锁)。
int main() { srand((unsigned int)time(NULL)); container<int> cont1; cont1.add(1); cont1.add(2); cont1.add(3); container<int> cont2; cont2.add(4); cont2.add(5); cont2.add(6); std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3); std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6) t1.join(); t2.join(); return 0; }
要解决这个问题,可以使用std::lock来确保以避免发生死锁的方式来获取锁。
void exchange(container<int>& cont1, container<int>& cont2, int value) { std::lock(cont1._lock, cont2._lock); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
条件变量C++11 还提供了另外一种同步原语,就是条件变量,它能使一个或多个线程进入阻塞状态,直到接到另一个线程的通知,或者发生超时或虚假唤醒时,才退出阻塞.在头文件<condition_variable> 里对条件变量有两种实现:
condition_variable:要求任何在等待该条件变量的线程必须先获取std::unique_lock锁。
Condition_variable_any:是一种更加通用的实现,可以用于任意满足锁的基本条件的类型(该实现只要提供了lock()和unlock()方法即可)。因为使用它花费的代价比较高(从性能和操作系统资源的角度来讲),所以只有在提供了必不可少的额外的灵活性的条件下才提倡使用它。
下面来讲讲条件变量的工作原理: 至少有一个线程在等待某个条件变为true。等待的线程必须先获取unique_lock 锁。该锁被传递给wait()方法,wait()方法会释放互斥量,并将线程挂起,直到条件变量接收到信号。收到信号后,线程会被唤醒,同时该锁也会被重新获取。
至少有一个线程发送信号使某个条件变为true。可以使用notify_one()来发送信号,同时唤醒一个正在等待该条件收到信号的处于阻塞状态的线程,或者用notify_all()来唤醒在等待该条件的所有线程。
在多处理器系统中,因为一些复杂情况,要想完全预测到条件被唤醒并不容易,还会出现虚假唤醒的情况。就是说,在没人给条件变量发送信号的情况下,线程也可能会被唤醒。所以线程被唤醒后,还需要检测条件是否为true。因为可能会多次发生虚假唤醒,所以需要进行循环检测。
下面代码是一个使用条件变量来同步线程的例子:几个工作线程运行时可能会产生错误并将错误代码放到队列里。记录线程会从队列里取出错误代码并输出它们来处理这些错误。发生错误的时候,工作线程会给记录线程发信号。记录线程一直在等待条件变量接收信号。为了避免发生虚假唤醒,该等待过程在循环检测条件的布尔值。
#include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <queue> #include <random> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue<int> g_codes; bool g_done; bool g_notified; void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "] running..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "] an error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true; g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger] running..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) // used to avoid spurious wakeups { g_queuecheck.wait(locker); } // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger] processing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false; } } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); // start the logger std::thread loggerthread(loggerfunc); // start the working threads std::vector<std::thread> threads; for(int i = 0; i < 5; ++i) { threads.push_back(std::thread(workerfunc, i+1, std::ref(generator))); } // work for the workers to finish for(auto& t : threads) t.join(); // notify the logger to finish and wait for it g_done = true; loggerthread.join(); return 0; }
运行上述代码,输出结果如下(注意每次运行,输出结果都不一样;因为每个工作线程运行时都有一个随机的休眠时间)。
[logger] running... [worker 1] running... [worker 2] running... [worker 3] running... [worker 4] running... [worker 5] running... [worker 1] an error occurred: 101 [worker 2] an error occurred: 201 [logger] processing error: 101 [logger] processing error: 201 [worker 5] an error occurred: 501 [logger] processing error: 501 [worker 3] an error occurred: 301 [worker 4] an error occurred: 401 [logger] processing error: 301 [logger] processing error: 401
上面看到的wait()方法有两个重载:
- 第一个重载带有锁unique_lock;这个重载方法可以释放锁,阻塞线程,并把线程添加到正在等待这一条件变量的线程队列里面。当该条件变量收到信号或者发生虚假唤醒时,线程就会被唤醒。它们其中任何一个发生时,锁都会被重新获取,函数返回。
- 第二个重载除了带有锁unique_lock外,还带有循环判定直到返回false值;这个重载是用来避免发生虚假唤醒。它基本上等价于下面的语句:
while(!predicate()) wait(lock);
因此在上面的例子中,通过使用重载的wait()方法以及验证队列状态的判断(空或不空),就可以避免使用布尔变量g_notified了。
void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "] running..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "] an error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger] running..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger] processing error: " << g_codes.front() << std::endl; g_codes.pop(); } } }
除了这个重载的wait()方法,还有另外两个类似的重载方法,也带有避免虚假唤醒的判定。
- Wait_for: 在条件变量收到信号或者指定的超时发生前,线程一直处于阻塞状态;
- Wait_until:在条件变量收到信号或者指定的时刻到达之前,线程一直处于阻塞状态。
这两个函数的不带有判定的重载返回cv_status状态,用来表明发生超时或者线程被唤醒是因为条件变量收到信号或者发生虚假唤醒。
该标准还提供了一个函数notify_all_at_thread_exit,它实现了一个机制,通知其他线程给定线程已经运行结束,并销毁所有的thread_local对象。该函数的引进是因为在使用了thread_local后,采用除join()之外的其他机制来等待线程会导致不正确甚至致命的行为发生。
因为thread_local的析构函数会在等待中的线程恢复执行和可能执行结束的情况下被调用(可参考N3070和N2880得知更多信息)。
通常情况下,对这个函数的调用必须在线程生成之前。下面的例子描述了如何使用notify_all_at_thread_exit和condition_variable共同完成对两个线程的同步操作:
std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std::thread worker(workerfunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while(!g_done) // avoid spurious wake-ups g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; }
如果工作线程在主线程执行结束之前结束,输出结果将如下:
main running... worker running... main crunching... worker finished... main waiting for worker... main finished...
如果主线程比工作线程更早结束,输出结果将如下:
main running... worker running... main crunching... main waiting for worker... worker finished... main finished...
结束语
C++11标准可以让C++开发者以一种标准的,独立平台的方式来编写多线程。这篇文章大概讲述了该标准所支持的线程和同步机制。头文件<thread>提供了thread类(和一些帮助函数),表明thread类是一个可执行线程。头文件<mutex>提供了几种互斥量的实现和对线程进行同步访问的封装类。头文件<condition_variable>提供了条件变量的两种实现,这些实现使一个或多个线程一直处于阻塞状态,直到接收到其他线程的通知,或发生超时或者有虚假唤醒发生时才会被唤醒。推荐读者朋友可以阅读其他资料来获取更多的详细信息。
扫码关注微信公众号