多线程一直是编程中的重要的工具,它可以分充分的利用硬件资源,是我们用更少的时间去完成更多的事情。在之前的博客中,我有介绍了OpenMP的基本使用,OpenMP可以理解为多线程的一个合理和高效的一套抽象工具。这次,打算仔细的介绍多线程编程中的常见的概念和典型的案例。
典型的案例
说到多线程,最核心的问题就是保证数据的读写安全。为了达到此目的,我们需要多很多常见的数据结构做一些改造,从而适应多线程的场景。以下是我工作中比较常见到的一些使用场景:
- 线程池
- 读写锁
- 消息队列
- ConcurrentCache
- PingPang Buffer
在具体介绍这些使用场景之前,我们还是需要了解需要使用到的一些基本的工具:互斥量、条件变量、原子操作等。
互斥量
互斥量,顾名思义,就是互斥的数据,一个线程持有的时候,其他线程就必须等待。
在C++11中,使用<mutex>
头文件引入。以下是一个简单的计数器的实现。
emit
函数通过mutex_
进行加锁,使得同一时间仅有一个线程可以执行++ x_
的操作,从而保证了计数的正确性。
std::lock_guard
是个工具类,lck在构造时,调用了lock函数,在析构时调用了unlock,从而避免我们自行调用的时候忘记unlock。
#include <mutex>
#include <thread>
#include <iostream>
class Counter {
public:
Counter(): x_(0) {}
void emit() {
mutex_.lock();
++ x_;
mutex_.unlock();
// or
// std::lock_guard<std::mutex> lck(mutex_);
// ++ x_;
}
int count() {
return x_;
}
private:
int x_;
std::mutex mutex_;
};
int main() {
Counter c;
std::thread t1([&c]{
for (int i = 0; i < 10000000; ++ i) {
c.emit();
}
});
std::thread t2([&c]{
for (int i = 0; i < 10000000; ++ i) {
c.emit();
}
});
t1.join();
t2.join();
std::cout << c.count() << std::endl; // 20000000
}
基于Mutex,我们可以方便的实现读写锁。读写锁的作用是,保证数据可以供多个线程并发读,仅一个线程允许写。在存在线程读的情况下,写的线程会阻塞,直到没有任何线程有读操作。
读写锁
首先读写锁会存在一个write_mutex,读线程和写线程都需要抢占这个mutex,从而保证读和写不会同时进行。但是只需要第一个读线程抢占write_mutex即可,其他的读线程不需要再抢占(抢占的话,就不支持并发读了)。当不存在读线程的时候,需要释放write_mutex,这才运行写线程抢占。
因此我们还需要一个计数器,记录当前读线程的个数,并使用另一个read_mutex保证计数器的准确。
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
class ReadWriteLock {
public:
ReadWriteLock():reader_count_(0) {}
void lock_read() {
read_mutex_.lock();
if (reader_count_ == 0) {
write_mutex_.lock();
}
++ reader_count_;
read_mutex_.unlock();
}
void unlock_read() {
read_mutex_.lock();
-- reader_count_;
if (reader_count_ == 0) {
write_mutex_.unlock();
}
read_mutex_.unlock();
}
void lock_write() {
write_mutex_.lock();
}
void unlock_write() {
write_mutex_.unlock();
}
private:
std::mutex read_mutex_;
std::mutex write_mutex_;
int64_t reader_count_;
};
ReadWriteLock rw_lock;
void read_fn(int idx, int start, int end) {
std::this_thread::sleep_for(std::chrono::seconds(start));
rw_lock.lock_read();
std::cout << "read thread #" << idx << ": read data" << std::endl;
std::this_thread::sleep_for (std::chrono::seconds(end - start));
std::cout << "read thread #" << idx << ": read over" << std::endl;
rw_lock.unlock_read();
}
void write_fn(int idx, int start, int end) {
std::this_thread::sleep_for(std::chrono::seconds(start));
rw_lock.lock_write();
std::cout << "write thread #" << idx << ": write data" << std::endl;
std::this_thread::sleep_for (std::chrono::seconds(end - start));
std::cout << "write thread #" << idx << ": write over" << std::endl;
rw_lock.unlock_write