在上一篇文章中,我们学习了如何使用互斥量去修复并发问题。在这篇文章中,我们会继续去研究关于互斥量一些更高阶的技术。我们也会学习其他C++11的并发技术:条件变量。
递归锁
让我们想象你有如下一个简单的类:
struct Complex {
std::mutex mutex;
int i;
Complex() : i(0) {}
void mul(int x){
std::lock_guard<std::mutex> lock(mutex);
i *= x;
}
void div(int x){
std::lock_guard<std::mutex> lock(mutex);
i /= x;
}
};
这时你想去增加一个可以正确执行所有操作的新的操作,因此你可以写一个新的函数:
void both(int x, int y){
std::lock_guard<std::mutex> lock(mutex);
mul(x);
div(y);
}
现在,来测试一下这个函数:
int main(){
Complex complex;
complex.both(32, 23);
return 0;
}
如果你运行了这个程序,你会看到这个程序永远也不会结束。问题非常简单,在both函数中,这个线程需要锁并且访问了mul函数。在这个函数中,线程尝试再一次获取锁。这就导致了死锁的出现。默认情况下,一个线程不能获取同一个互斥量两次。
有一个简单的解决方案:std::recursive_mutex。这种互斥量可以被同一个线程获取多次。这里是一个正确Complex结构体的版本:
struct Complex {
std::recursive_mutex mutex;
int i;
Complex() : i(0) {}
void mul(int x){
std::lock_guard<std::recursive_mutex> lock(mutex);
i *= x;
}
void div(int x){
std::lock_guard<std::recursive_mutex> lock(mutex);
i /= x;
}
void both(int x, int y){
std::lock_guard<std::recursive_mutex> lock(mutex);
mul(x);
div(y);
}
};
这一次,程序运行正确了。
时间锁
有些时候,你不想让一个线程去无限地等待一个互斥量。举个例子,你想让一个线程要么做一些事情,要么等待(互斥量解锁)。为了这个目标,标准库有一种解决方案:std::timed_mutex和std::recursive_timed_mutex(如果你需要互斥量的递归特性)。你一样可以通过std::mutex:lock和unlock去获取互斥量的访问权限,但是你也有两个新的函数可以去做这件事情:try_lock_for和try_lock_util。
其中第一个函数也是最有用的。它允许你去设置一个超时放弃,此时函数自动返回,即使是它没有获取到互斥量。当这个函数获得锁时,它会返回true,否则是false。让我们在下面这个简单的程序中试一下吧:
std::timed_mutex mutex;
void work(){
std::chrono::milliseconds timeout(100);
while(true){
if(mutex.try_lock_for(timeout)){
std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;
std::chrono::milliseconds sleepDuration(250);
std::this_thread::sleep_for(sleepDuration);
mutex.unlock();
std::this_thread::sleep_for(sleepDuration);
} else {
std::cout << std::this_thread::get_id() << ": do work without mutex" << std::endl;
std::chrono::milliseconds sleepDuration(100);
std::this_thread::sleep_for(sleepDuration);
}
}
}
int main(){
std::thread t1(work);
std::thread t2(work);
t1.join();
t2.join();
return 0;
}
(这个示例在真实的实践中完全可用)
在这个示例中,首先一件有趣的事情是声明std::chrono::milliseconds的持续时间,这也是一个C++11标准的一个新特征。你可以获取到多种时间类型:纳秒、微秒、毫秒、秒、分钟、小时。我们将timeout作为try_lock_for函数的变量输入。通过std::this_thread::sleep_for(duration),我们也用它去做线程睡眠。这段示例剩下的部分没有什么可说的,仅仅是一些可视化地查看一些输出结果。注意这段代码永远不会停止,你必须手动杀死它。
一次调用
有时候你会想要一个函数只被调用一次,无论有多少个线程在使用它。想象一个函数有两个部分:第一部分只会被调用一次,剩下的部分则会在函数调用时被多次执行。我们可以使用std::call_once函数去非常简单地修复这个问题。下面是一个利用这个机制的示例:
std::once_flag flag;
void do_something(){
std::call_once(flag, [](){std::cout << "Called once" << std::endl;});
std::cout << "Called each time" << std::endl;
}
int main(){
std::thread t1(do_something);
std::thread t2(do_something);
std::thread t3(do_something);
std::thread t4(do_something);
t1.join();
t2.join();
t3.join();
t4.join();
return 0;
}
每一个std::call_once有一个对应的std::once_flag变量。在这里,我们设置了一个只会被调用一次的闭包,但一个函数指针或std::function也能实现这一点。
环境变量
一个环境变量会管理一个等待中的线程列表,直到其他线程唤醒它们。每一个想去等待环境变量的线程必须首先要获取一个锁。当线程开始在等待环境变量时,这个锁会在之后被释放;并且当线程被唤醒时,这个锁被获取。
一个很好的示例是并行的有界缓冲区。这是一个有开始和结束标志的确定容量的循环缓冲。下面是我们利用环境变量的实现:
struct BoundedBuffer {
int* buffer;
int capacity;
int front;
int rear;
int count;
std::mutex lock;
std::condition_variable not_full;
std::condition_variable not_empty;
BoundedBuffer(int capacity) : capacity(capacity), front(0), rear(0), count(0) {
buffer = new int[capacity];
}
~BoundedBuffer(){
delete[] buffer;
}
void deposit(int data){
std::unique_lock<std::mutex> l(lock);
not_full.wait(l, [this](){return count != capacity; });
buffer[rear] = data;
rear = (rear + 1) % capacity;
++count;
l.unlock();
not_empty.notify_one();
}
int fetch(){
std::unique_lock<std::mutex> l(lock);
not_empty.wait(l, [this](){return count != 0; });
int result = buffer[front];
front = (front + 1) % capacity;
--count;
l.unlock();
not_full.notify_one();
return result;
}
};
这个互斥量通过std::unique_lock进行管理。它是一个管理这个锁的装饰器。使用环境变量是很有必要的,为了去唤醒一个正在等待环境变量的线程,notify_one函数被使用。在notify_one 之前的解锁并不是完全必要的。如果你忽视了它,它会被unique_lock的析构函数自动结束。但是,这是有可能通过调用notify_one唤醒一个等待的线程,这个线程会直接被再一次阻塞,因为锁仍然没有被唤醒线程解开。因此,如果你在那之前做这件事情,唤醒线程会直接获得锁。因此,这是一个细微的优化,但它不会做出很大的改进。这个等待函数是有一点特殊。它让第一个参数作为一个独立的锁,而第二个参数则是一个断言。当等待继续的时候,这个断言必须返回FALSE(等同于while(!pred()){cv.wait(l);}),这个函数剩下的部分没什么可说的。
我们可以使用这个结构体去解决多个生产者消费者问题。这个是一个在并行编程中非常通用的问题。多个线程(消费者)等待消费从其他多个线程(生产者)生产的数据。这儿是一个使用这个结构体的多线程例子:
void consumer(int id, BoundedBuffer& buffer){
for(int i = 0; i < 50; ++i){
int value = buffer.fetch();
std::cout << "Consumer " << id << " fetched " << value << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(250));
}
}
void producer(int id, BoundedBuffer& buffer){
for(int i = 0; i < 75; ++i){
buffer.deposit(i);
std::cout << "Produced " << id << " produced " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main(){
BoundedBuffer buffer(200);
std::thread c1(consumer, 0, std::ref(buffer));
std::thread c2(consumer, 1, std::ref(buffer));
std::thread c3(consumer, 2, std::ref(buffer));
std::thread p1(producer, 0, std::ref(buffer));
std::thread p2(producer, 1, std::ref(buffer));
c1.join();
c2.join();
c3.join();
p1.join();
p2.join();
return 0;
}
三个消费线程和两个生产线程不断地被创建和查询这个结构体。一个有趣的事情在于这个示例在使用std::ref,以引用的方式传输缓存,这对于避免缓存的复制是很有必要的。
圆满完成
这篇文章中,我们讨论了多个事情。首先我们学习了如何使用recursive_mutex去允许一个线程获取另一个线程超过一次。之后,我们学习了如何去用超时的方式获取一个互斥量。然后,我们学习了只访问一个函数一次的方法。最后,利用环境变量解决了多个生产者/消费者问题。
本文示例的源代码在这里。