在多核编程中,我们使用内核对象【如:事件对象(Event)、互斥量对象(Mutex,或互斥体对象)、信号量对象(Semaphore)等】来避免多个线程修改同一个数据时产生的竞争条件。
但是,基于内核对象的同步,会带来昂贵的上下文切换(用户态切换到内核态,占用1000个以上的cpu周期)。就需要使用另一种方法 —— 原子指令。
原子指令(x为std::atomic类型) | 说明 |
x.load() |
读操作 返回x的值 |
x.store(n) |
写操作 把x设为n,什么都不返回 |
x.exchange(n) | 把x设为n,返回设定之前的值 |
x.fetch_add(n) | 原子地做x += n,返回修改之前的值 |
x.fetch_sub(n) | 原子地做x-= n,返回修改之前的值 |
仅靠原子技术实现不了对资源的访问控制,即使简单计数操作,看上去正确的代码也可能会crash。
这里的关键在于编译器和cpu实施的重排指令导致了读写顺序的变化。只要没有依赖,代码中在后面的指令就可能跑到前面去,编译器和CPU都会这么做。
注1:单线程代码不需要关心乱序的问题。因为乱序至少要保证这一原则:不能改变单线程程序的执行行为
注2:内核对象多线程编程在设计的时候都阻止了它们调用点中的乱序(已经隐式包含memory barrier),不需要考虑乱序的问题。
注3:使用用户模式下的线程同步时,乱序的效果才会显露无疑。
程序员可以使用c++11 atomic提供了6种memory order,来在编程语言层面对编译器和cpu实施的重排指令行为进行控制
memory order | 作用 |
memory_order_relaxed | 无fencing作用,cpu和编译器可以重排指令 |
memory_order_consume |
后面依赖此原子变量的访存指令勿重排至此条指令之前 注:性能比memory_order_acquire高 |
memory_order_acquire | 后面访存指令勿重排至此条指令之前 |
memory_order_release | 前面访存指令勿重排到此条指令之后 |
memory_order_acq_rel | acquare + release |
memory_order_seq_cst | acq_rel + 所有使用seq_cst的指令有严格的全序关系 |
多线程编程时,通过这些标志位,来读写原子变量,可以组合出4种同步模型:
Relaxed ordering
Release-Acquire ordering
Release-Consume ordering
Sequentially-consistent ordering
默认情况下,std::atomic使用的是Sequentially-consistent ordering(最严格的同步模型)。但在某些场景下,合理使用其它3种ordering,可以让编译器优化生成的代码,从而提高性能。
Relaxed ordering
在这种模型下,std::atomic的load()和store()都要带上memory_order_relaxed参数。Relaxed ordering仅仅保证load()和store()是原子操作,除此之外,不提供任何跨线程的同步。
先看看一个简单的例子:
std::atomic<int> x = 0; // global variable std::atomic<int> y = 0; // global variable Thread-1: Thread-2: r1 = y.load(memory_order_relaxed); // A r2 = x.load(memory_order_relaxed); // C x.store(r1, memory_order_relaxed); // B y.store(42, memory_order_relaxed); // D
执行完上面的程序,可能出现r1 == r2 == 42。理解这一点并不难,因为编译器允许调整 C 和 D 的执行顺序。
如果程序的执行顺序是 D -> A -> B -> C,那么就会出现r1 == r2 == 42。
如果某个操作只要求是原子操作,不需要其它同步的保障,就可以使用 Relaxed ordering。程序计数器是一种典型的应用场景。
#include <cassert> #include <vector> #include <iostream> #include <thread> #include <atomic> std::atomic<int> cnt = {0}; void f() { for (int n = 0; n < 1000; ++n) { cnt.fetch_add(1, std::memory_order_relaxed); } } int main() { std::vector<std::thread> v; for (int n = 0; n < 10; ++n) { v.emplace_back(f); } for (auto& t : v) { t.join(); } assert(cnt == 10000); // never failed return 0; }
Release-Acquire ordering
在这种模型下,store()使用memory_order_release,而load()使用memory_order_acquire。这种模型有两种效果,第一种是可以限制 CPU 指令的重排:
(1)在store()之前的所有读写操作,不允许被移动到这个store()的后面。 // write-release语义
(2)在load()之后的所有读写操作,不允许被移动到这个load()的前面。 // read-acquire语义
该模型可以保证:如果Thread-1的store()的那个值,成功被 Thread-2的load()到了,那么 Thread-1在store()之前对内存的所有写入操作,此时对 Thread-2 来说,都是可见的。
下面的例子阐述了这种模型的原理:
#include <thread> #include <atomic> #include <cassert> #include <string> std::atomic<bool> ready{ false }; int data = 0; void producer() { data = 100; // A ready.store(true, std::memory_order_release); // B } void consumer() { while (!ready.load(std::memory_order_acquire)) // C ; assert(data == 100); // never failed // D } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; }
让我们分析一下这个过程:
首先 A 不允许被移动到 B 的后面。
同样 D 也不允许被移动到 C 的前面。
当 C 从 while 循环中退出了,说明 C 读取到了 B store()的那个值,此时,Thread-2 保证能够看见 Thread-1 执行 B 之前的所有写入操作(也即是 A)。
使用Release-Acquire ordering实现双重检查锁模式(DLCP)
下面单件为例来说明:
class Singleton { public: static Singleton* get_instance() { Singleton* tmp = instance_.load(std::memory_order_acquire); if (tmp == nullptr) { std::unique_lock<std::mutex> lk(mutex_); tmp = instance_; if (tmp == nullptr) { tmp = new Singleton(); instance_.store(std::memory_order_release); } } return tmp; } private: Singleton() = default; static std::atomic<Singleton*> instance_; static std::mutex mutex_; };
使用Release-Acquire ordering实现自旋锁(Spinlock)
获取和释放语义,是实现锁的基础(Spinlock, Mutex, RWLock, ...),所有被[Read Acquire,Write Release]包含的区域,即构成了一个临界区,临界区里的内存操作,不会乱序到临界区之外执行。
read-acquire(判断是否加锁,没则加锁,否则循环等待)
-------------------------------------------------------------------------
all memory operation stay between the line(临界区)
-------------------------------------------------------------------------
write-release(释放锁)
实现代码如下:
#include <atomic> class simple_spin_lock { public: simple_spin_lock() = default; void lock() { while (flag.test_and_set(std::memory_order_acquire)) continue; } void unlock() { flag.clear(std::memory_order_release); } private: simple_spin_lock(const simple_spin_lock&) = delete; simple_spin_lock& operator =(const simple_spin_lock&) = delete; std::atomic_flag flag = ATOMIC_FLAG_INIT; };
① 对std::atomic_flag的操作具有原子性,保证了同一时间,只有一个线程能够lock成功,其余线程全部在while循环
② 使用了acquire内存屏障, 所以lock具有获取语义
③ 使用了release内存屏障, 所以unlock具有释放语义
Release-Consume ordering
在这种模型下,store()使用memory_order_release,而load()使用memory_order_consume。这种模型有两种效果,第一种是可以限制 CPU 指令的重排:
(1)在store()之前的所有读写操作,不允许被移动到这个store()的后面。
(2)在load()之后的所有依赖此原子变量的读写操作,不允许被移动到这个load()的前面。
注:不依赖此原子变量的读写操作可能会CPU指令重排
下面的例子阐述了这种模型的原理:
#include <thread> #include <atomic> #include <cassert> #include <string> std::atomic<std::string*> ptr; int data; // thread1 void producer() { std::string* p = new std::string("Hello"); // A data = 42; // B ptr.store(p, std::memory_order_release); // C } // thread2 void consumer() { std::string* p2; while (!(p2 = ptr.load(std::memory_order_consume))) // D ; assert(*p2 == "Hello"); //E always true: *p2 carries dependency from ptr assert(data == 42); // F may be false: data does not carry dependency from ptr } int main() { std::thread t1(producer); std::thread t2(consumer); t1.join(); t2.join(); return 0; }
Sequentially-consistent ordering
所有以memory_order_seq_cst为参数的原子操作(不限于同一个原子变量),对所有线程来说有一个全局顺序(total order)
并且两个相邻memory_order_seq_cst原子操作之间的其他操作(包括非原子变量操作),不能reorder到这两个相邻操作之外
UE4下的Memory Order
enum class EMemoryOrder { // Provides no guarantees that the operation will be ordered relative to any other operation. Relaxed, // Establishes a single total order of all other atomic operations marked with this. SequentiallyConsistent // Load和Store函数缺省为该类型 };
详见:UnrealEngineEngineSourceRuntimeCorePublicTemplatesAtomic.h
Atomic相关的测试代码见:UnrealEngineEngineSourceRuntimeCorePrivateTestsMiscAtomicTest.cpp
参考资料