zoukankan      html  css  js  c++  java
  • 使用C++11原子量实现自旋锁

    一、自旋锁

    自旋锁是一种基础的同步原语,用于保障对共享数据的互斥访问。与互斥锁的相比,在获取锁失败的时候不会使得线程阻塞而是一直自旋尝试获取锁。当线程等待自旋锁的时候,CPU不能做其他事情,而是一直处于轮询忙等的状态。自旋锁主要适用于被持有时间短,线程不希望在重新调度上花过多时间的情况。实际上许多其他类型的锁在底层使用了自旋锁实现,例如多数互斥锁在试图获取锁的时候会先自旋一小段时间,然后才会休眠。如果在持锁时间很长的场景下使用自旋锁,则会导致CPU在这个线程的时间片用尽之前一直消耗在无意义的忙等上,造成计算资源的浪费。

    二、CAS操作实现自旋锁

    CAS(Compare and Swap),即比较并替换,实现并发算法时常用到的一种技术,这种操作提供了硬件级别的原子操作(通过锁总线的方式)。CAS操作的原型可以认为是:

    bool CAS(V, A, B)
    

    其中V代表内存中的变量,A代表期待的值,B表示新值。当V的值与A相等时,将V与B的值交换。逻辑上可以用下面的伪代码表示:

    bool CAS(V, A, B)
    {
        if (V == A)
        {
            swap(V, B);
            return true;
        }
        
        return false;
    }
    

    需要强调的是上面的操作是原子的,要么不做,要么全部完成。

    那么已经拥有CAS操作的情况下如何实现一个自旋锁呢?首先回忆自旋锁的用途,本质上我们是希望能够让一个线程在不满足进入临界区的条件时,不停的忙等轮询,直到可以运行的时候再继续(进入临界区)执行。那么,我们可能自然的想到使用一个bool变量来表示是否可以进入临界区,例如以下面的伪代码的逻辑:

    while(flag == true);
    flag = true;
    /*
    do something ...
    */
    flag = false;
        ...
    

    这样做的直观想法是当flag为true的时候表示已经有线程处于临界区内,只有当flag为fasle时才能进入,而在进入的时候立即将flag置为true。但是这样做明显存在一个问题,判断flag为false和设置flag为true并不是一个不可分割的整体,有可能出现类似下面这样的时序, 假设最初flag为false:

    step thread 1 thread 2
    1 while(flag == true);
    2 while(flag == true);
    3 flag = true
    4 flag = true
    5 do something do something
    6 flag = false
    7 flag = false

    step是虚构的步骤,do something为一系列指令,这里写在一起表示并发执行。这里可以看出由于thread1读取判断flag的值与修改flag的值是两个独立的操作,中间插入了thread2的判断操作,最终使得有两个线程同时进入了临界区,这与我们的期望相悖。那么如何解决呢?如果能将读取判断与修改的操作合二为一,变成一个不可分割的整体,那么自然就不可能出现这种交错的场景。对于这样一个整体操作,我们希望它能读取内存中变量的值,并且当其等于特定值的时候,修改它为我们需要的另一个值。嗯......没错,这样我们就得到了CAS操作。

    现在可以重新修改我们的同步方式,不停的进行期望flag为false的CAS操作 CAS(flag, flase, b) (这里b为true),直到其返回成功为止,再进行临界区中的操作,离开临界区时将flag置为false。

    b = true;
    while(!CAS(flag, false, b));
    //do something
    flag = false;
    

    现在,判断操作与写入操作已经成为了一个整体,当一个线程的CAS操作成功的时候会阻止其他线程进入临界区,到达互斥访问的目的。

    现在我们已经可以使用CAS操作来解决临界区的互斥访问的问题了,但是如果每次都这样写一遍实在太过麻烦,因此可以进行一些封装使得使用更加方便,也就是说...可以封装成自旋锁。我们可以用一个类来表示,将一个bool值作为类的数据成员,同时将CAS操作和赋值操作作为其成员函数,CAS操作其实就是加锁操作,而后面的赋值操作就是解锁操作。

    三、用C++原子量实现

    按照上面的思路,接下来用 C++ 11 引入标准库的原子量来实现一个自旋锁并且进行测试。

    首先,我们需要一个bool值来表示锁的状态,这里直接使用标准库中的原子量 atomic<bool> (C++ 11的原子量可以参考:https://www.cnblogs.com/FateTHarlaown/p/8919235.html) ,在我的平台(Cygwin64、GCC7.3)上 atomic<bool> 的成员函数is_lock_free()返回值为true,是无锁的实现(如果内部使用了锁来实现的话那还叫什么自旋锁 = =)。实际上在大多数平台上 atomic<bool> 都是无锁的,如果不确定的话也可以使用C++标准规定必须为无锁实现的atomic_flag。

    接下来,我们需要两个原子操作,CAS和赋值,C++11标准库在原子量的成员函数中直接提供了这两个操作。

    //CAS
    std::atomic::compare_exchange_weak( T& expected, T desired,
                                        std::memory_order order =
                                        std::memory_order_seq_cst ),
                                        
    std::atomic::compare_exchange_strong( T& expected, T desired,
                                        std::memory_order order =
                                        std::memory_order_seq_cst )
    //赋值
    void store( T desired, std::memory_order order = std::memory_order_seq_cst )
    

    compare_exchange_weak 与 compare_exchange_strong 主要的区别在于内存中的值与expected相等的时候,CAS操作是否一定能成功,compare_exchange_weak有概率会返回失败,而compare_exchange_strong则一定会成功。因此,compare_exchange_weak必须与循环搭配使用来保证在失败的时候重试CAS操作。得到的好处是在某些平台上compare_exchange_weak性能更好。按照上面的模型,我们本来就要和while搭配使用,可以使用compare_exchange_weak。最后内存序的选择没有特殊需求直接使用默认的std::memory_order_seq_cst。而赋值操作非常简单直接,这个调用一定会成功(只是赋值而已 = =),没有返回值。
    实现代码非常短,下面是源代码:

    #include <atomic>
    
    class SpinLock {
    
    public:
        SpinLock() : flag_(false)
        {}
    
        void lock()
        {
            bool expect = false;
            while (!flag_.compare_exchange_weak(expect, true))
            {
                //这里一定要将expect复原,执行失败时expect结果是未定的
                expect = false;
            }
        }
    
        void unlock()
        {
            flag_.store(false);
        }
    
    private:
        std::atomic<bool> flag_;
    };
    

    如上面所说,lock操作不停的尝试CAS操作直到成功为止,unlock操作则将bool标志位复原。使用方式如下:

    SpinLock myLock;
    myLock.lock();
    
    //do something
    
    myLock.unlock();
    

    接下来,我们进行正确性测试,以经典的i++ 问题为例:

    #include <atomic>
    #include <thread>
    #include <vector>
    
    //自旋锁类定义
    class SpinLock {
    
    public:
        SpinLock() : flag_(false)
        {}
    
        void lock()
        {
            bool expect = false;
            while (!flag_.compare_exchange_weak(expect, true))
            {
                expect = false;
            }
        }
    
        void unlock()
        {
            flag_.store(false);
        }
    
    private:
        std::atomic<bool> flag_;
    };
    
    //每个线程自增次数
    const int kIncNum = 1000000;
    //线程数
    const int kWorkerNum = 10;
    //自增计数器
    int count = 0;
    //自旋锁
    SpinLock spinLock;
    //每个线程的工作函数
    void IncCounter()
    {
        for (int i = 0; i < kIncNum; ++i)
        {
            spinLock.lock();
            count++;
            spinLock.unlock();
        }
    }
    
    int main()
    {
        std::vector<std::thread> workers;
        std::cout << "SpinLock inc MyTest start" << std::endl;
        count = 0;
    
        std::cout << "start " << kWorkerNum << " workers_" << "every worker inc " << kIncNum << std::endl;
        std::cout << "count_: " << count << std::endl;
        //创建10个工作线程进行自增操作
        for (int i = 0; i < kWorkerNum; ++i)
            workers.push_back(std::move(std::thread(IncCounter)));
    
        for (auto it = workers.begin(); it != workers.end(); it++)
            it->join();
    
        std::cout << "workers_ end" << std::endl;
        std::cout << "count_: " << count << std::endl;
        //验证结果
        if (count == kIncNum * kWorkerNum)
        {
            std::cout << "SpinLock inc MyTest passed" << std::endl;
            return true;
        }
        else
        {
            std::cout << "SpinLock inc MyTest failed" << std::endl;
            return false;
        }
    
        return 0;
    }
    

    上面的代码中创建了10个线程对共享的全局变量count分别进行一百万次++操作,然后验证结果是否正确,最终执行的输出为:

    SpinLock inc MyTest start
    start 10 workers_every worker inc 1000000
    count_: 0
    workers_ end
    count_: 10000000
    SpinLock inc MyTest passed
    

    从结果中可以看出我们实现的自旋锁起到了保护临界区(这里就是i++ )的作用,count最后的值等于每个线程执行自增的数目之和。作为对比,可以去掉IncCounter中的加锁解锁操作:

    void IncCounter()
    {
        for (int i = 0; i < kIncNum; ++i)
        {
            //spinLock.lock();
            count++;
            //spinLock.unlock();
        }
    }
    

    执行后的输出为:

    SpinLock inc MyTest start
    start 10 workers_every worker inc 1000000
    count_: 0
    workers_ end
    count_: 7254522
    SpinLock inc MyTest failed
    

    结果由于多个线程同时执行 i++ 造成结果错误。

    到这里,我们就通过 C++ 11的原子量实现了一个简单的自旋锁。这里只是对C++原子量的一个小使用,无论是自旋锁本身还是原子量都还有许多值得探究的地方。

  • 相关阅读:
    PAT 1097. Deduplication on a Linked List (链表)
    PAT 1096. Consecutive Factors
    PAT 1095. Cars on Campus
    PAT 1094. The Largest Generation (层级遍历)
    PAT 1093. Count PAT's
    PAT 1092. To Buy or Not to Buy
    PAT 1091. Acute Stroke (bfs)
    CSS:word-wrap/overflow/transition
    node-webkit中的requirejs报错问题:path must be a string error in Require.js
    script加载之defer和async
  • 原文地址:https://www.cnblogs.com/FateTHarlaown/p/9170474.html
Copyright © 2011-2022 走看看