互斥锁std::mutex
是一种最常见的线程间同步的手段,但是在有些情况下不太高效。
假设想实现一个简单的消费者生产者模型,一个线程往队列中放入数据,一个线程往队列中取数据,取数据前需要判断一下队列中确实有数据,由于这个队列是线程间共享的,所以,需要使用互斥锁进行保护,一个线程在往队列添加数据的时候,另一个线程不能取,反之亦然。用互斥锁实现如下:
#include <iostream> #include <deque> #include <thread> #include <mutex> std::deque<int> q; std::mutex mu; void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); locker.unlock(); std::this_thread::sleep_for(std::chrono::seconds(1)); count--; } } void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); if (!q.empty()) { data = q.back(); q.pop_back(); locker.unlock(); std::cout << "t2 got a value from t1: " << data << std::endl; } else { locker.unlock(); } } } int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join(); return 0; }
root@ubuntu:~/c++# g++ -std=c++11 cond.cpp -o cond -pthread root@ubuntu:~/c++# ./cond t2 got a value from t1: 10 t2 got a value from t1: 9 t2 got a value from t1: 8 t2 got a value from t1: 7 t2 got a value from t1: 6 t2 got a value from t1: 5 t2 got a value from t1: 4 t2 got a value from t1: 3 t2 got a value from t1: 2 t2 got a value from t1: 1 root@ubuntu:~/c++#
可以看到,互斥锁其实可以完成这个任务,但是却存在着性能问题。
首先,function_1
函数是生产者,在生产过程中,std::this_thread::sleep_for(std::chrono::seconds(1));
表示延时1s
,所以这个生产的过程是很慢的;function_2
函数是消费者,存在着一个while
循环,只有在接收到表示结束的数据的时候,才会停止,每次循环内部,都是先加锁,判断队列不空,然后就取出一个数,最后解锁。所以说,在1s
内,做了很多无用功!这样的话,CPU占用率会很高,可能达到100%(单核)。如 :
root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:01 183 ./cond root 30130 29844 30130 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:02 133 ./cond root 30139 29844 30139 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:03 116 ./cond root 30141 29844 30141 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:04 105 ./cond root 30158 29844 30158 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:04 122 ./cond root 30167 29844 30167 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:05 113 ./cond root 30181 29844 30181 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:06 106 ./cond root 30183 29844 30183 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:07 102 ./cond root 30192 29844 30192 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30118 53189 30118 00:00:00 0.0 ./cond root 30118 53189 30119 00:00:00 0.0 ./cond root 30118 53189 30120 00:00:07 113 ./cond root 30194 29844 30194 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 30210 29844 30210 00:00:00 0.0 grep cond root@ubuntu:~#
解决办法之一是给消费者也加一个小延时,如果一次判断后,发现队列是空的,就惩罚一下自己,延时500ms
,这样可以减小CPU的占用率。
void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); if (!q.empty()) { data = q.back(); q.pop_back(); locker.unlock(); std::cout << "t2 got a value from t1: " << data << std::endl; } else { locker.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } }
root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31221 53189 31221 00:00:00 0.0 ./cond root 31221 53189 31222 00:00:00 0.0 ./cond root 31221 53189 31223 00:00:00 0.0 ./cond root 31292 29844 31292 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31221 53189 31221 00:00:00 0.0 ./cond root 31221 53189 31222 00:00:00 0.0 ./cond root 31221 53189 31223 00:00:00 0.0 ./cond root 31319 29844 31319 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31221 53189 31221 00:00:00 0.0 ./cond root 31221 53189 31222 00:00:00 0.0 ./cond root 31221 53189 31223 00:00:00 0.0 ./cond root 31321 29844 31321 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31221 53189 31221 00:00:00 0.0 ./cond root 31221 53189 31222 00:00:00 0.0 ./cond root 31323 29844 31323 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31221 53189 31221 00:00:00 0.0 ./cond root 31221 53189 31222 00:00:00 0.0 ./cond root 31332 29844 31332 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31334 29844 31334 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31348 29844 31348 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 31357 29844 31357 00:00:00 0.0 grep cond root@ubuntu:~#
然后困难之处在于,如何确定这个延时时间呢,假如生产者生产的很快,消费者却延时500ms
,也不是很好,如果生产者生产的更慢,那么消费者延时500ms
,还是不必要的占用了CPU。
这就引出了条件变量(condition variable),c++11
中提供了#include <condition_variable>
头文件,其中的std::condition_variable
可以和std::mutex
结合一起使用,其中有两个重要的接口,notify_one()
和wait()
,wait()
可以让线程陷入休眠状态,在消费者生产者模型中,如果生产者发现队列中没有东西,就可以让自己休眠,但是不能一直不干活啊,notify_one()
就是唤醒处于wait
中的其中一个条件变量(可能当时有很多条件变量都处于wait
状态)。那什么时刻使用notify_one()
比较好呢,当然是在生产者往队列中放数据的时候了,队列中有数据,就可以赶紧叫醒等待中的线程起来干活了。
使用条件变量修改后如下:
#include <iostream> #include <deque> #include <thread> #include <mutex> #include <condition_variable> std::deque<int> q; std::mutex mu; std::condition_variable cond; void function_1() { int count = 10; while (count > 0) { std::unique_lock<std::mutex> locker(mu); q.push_front(count); locker.unlock(); cond.notify_one(); // Notify one waiting thread, if there is one. std::this_thread::sleep_for(std::chrono::seconds(1)); count--; } } void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); while(q.empty()) cond.wait(locker); // Unlock mu and wait to be notified data = q.back(); q.pop_back(); locker.unlock(); std::cout << "t2 got a value from t1: " << data << std::endl; } } int main() { std::thread t1(function_1); std::thread t2(function_2); t1.join(); t2.join(); return 0; }
root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32122 53189 32124 00:00:00 0.0 ./cond root 32176 29844 32176 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32122 53189 32124 00:00:00 0.0 ./cond root 32178 29844 32178 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32122 53189 32124 00:00:00 0.0 ./cond root 32187 29844 32187 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32122 53189 32124 00:00:00 0.0 ./cond root 32189 29844 32189 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32122 53189 32124 00:00:00 0.0 ./cond root 32203 29844 32203 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32205 29844 32205 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32122 53189 32122 00:00:00 0.0 ./cond root 32122 53189 32123 00:00:00 0.0 ./cond root 32214 29844 32214 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32217 29844 32217 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32219 29844 32219 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32228 29844 32228 00:00:00 0.0 grep cond root@ubuntu:~# ps H -eo user,pid,ppid,tid,time,%cpu,cmd | grep cond root 32230 29844 32230 00:00:00 0.0 grep cond root@ubuntu:~#
此时CPU的占用率也很低。
上面的代码有三个注意事项:
- 在
function_2
中,在判断队列是否为空的时候,使用的是while(q.empty())
,而不是if(q.empty())
,这是因为wait()
从阻塞到返回,不一定就是由于notify_one()
函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒,如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续wait()
阻塞。 - 在管理互斥锁的时候,使用的是
std::unique_lock
而不是std::lock_guard
,而且事实上也不能使用std::lock_guard
,这需要先解释下wait()
函数所做的事情。可以看到,在wait()
函数之前,使用互斥锁保护了,如果wait
的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait()
函数会先调用互斥锁的unlock()
函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。而lock_guard
没有lock
和unlock
接口,而unique_lock
提供了。这就是必须使用unique_lock
的原因。 - 使用细粒度锁,尽量减小锁的范围,在
notify_one()
的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁unlock()
。
cond.wait(locker);
换一种写法,wait()
的第二个参数可以传入一个函数表示检查条件,这里使用lambda
函数最为简单,如果这个函数返回的是true
,wait()
函数不会阻塞会直接返回,如果这个函数返回的是false
,wait()
函数就会阻塞着等待唤醒,如果被伪唤醒,会继续判断函数返回值。void function_2() { int data = 0; while ( data != 1) { std::unique_lock<std::mutex> locker(mu); cond.wait(locker, [](){ return !q.empty();} ); // Unlock mu and wait to be notified data = q.back(); q.pop_back(); locker.unlock(); std::cout << "t2 got a value from t1: " << data << std::endl; } }
除了
notify_one()
函数,c++
还提供了notify_all()
函数,可以同时唤醒所有处于wait
状态的条件变量。