zoukankan      html  css  js  c++  java
  • Effective Modern C++:07并发API

             C++11的志伟功勋之一,就是将并发融入了语言和库中,因此在C++的历史上,程序员可以首次跨越所有平台撰写具有标准行为的多线程程序。

    35:优先选用基于任务而非基于线程的程序设计

             如果需要以异步的方式运行函数doAsyncWork,有两种选择,基于线程的方法(使用std::thread)和基于任务(使用std::async)的方法:

    int doAsyncWork();
    std::thread t(doAsyncWork);
    
    auto fut = std::async(doAsyncWork); // "fut" for "future"

              基于任务的方法比基于线程的更好。基于线程的方法中,没有什么直截了当的方法获取doAsyncWork的返回值,而在基于任务的方法中,std::async返回的std::future提供了get函数可以获取返回值,而且如果doAsyncWork抛出了异常,get函数也能访问到该异常,而在使用线程的方法中,抛出异常直接导致std::terminate的调用。

            

             基于任务的方法比基于线程的方法更强大的优势在于,使程序员从线程管理的细节中得以脱身。首先需要介绍一下线程在带有并发的C++中有三种意义:

             硬件线程,实际执行计算的线程,现代计算机体系结构会为每个CPU内核提供一个或多个硬件线程;

             软件线程(操作系统线程)是操作系统用以实施跨进程的管理,以及进行硬件线程调度的线程。通常能够创建的软件线程要比硬件线程要多;

             std::thread是C++进程中的对象,用作底层软件线程的句柄,有些std::thread对象表示空句柄,也就是joinable为false的std::thread,它们要么处于默认构造状态(还没有待执行的函数),要么被移动了,要么被join了,要么被detach了。

             软件线程是一种有限的资源,如果试图创建的线程数量多于系统支持的数量,就会抛出std::system_error异常。因此即使doAsyncWork本身不抛出异常(带有noexcept声明),std::thread(doAsyncWork)还是可能会抛出异常。好的软件必须处理这种可能性,要么就是在当前线程执行doAsyncWork函数(这会导致阻塞,以及负载均衡问题),要么是等待已有线程完成工作后在创建一个新的std::thread对象(但是其他线程可能正等待doAsyncWork某个结果或是某个条件变量的通知)。

             即使没有达到数量最大值,也有可能发生超订问题(oversubscription),也就是就绪状态的软件线程超过了硬件线程数量。这种情况下,线程调度器会为软件线程在硬件线程上分配CPU时间片,当一个线程时间片用完,切换到另一个线程上时,就需要上下文切换。这种上下文切换会增加系统线程管理的开销,尤其是在一个软件线程的两次调度被调度器切换到不同的CPU内核上的硬件线程时就会发生高昂的计算成本,也就是CPU缓存不命中的情况会发生。避免超订是困难的,因为软件线程和硬件线程的最佳比例取决于软件线程变成就绪状态的频繁程度,而这是动态变化的。

             使用std::async,就相当于把上面这些问题交给C++标准库去处理。比如auto fut = std::async(doAsyncWork)这条语句,造成线程耗尽异常的可能性会大幅度缩小,因为使用上面的语句调用std::async(默认启动策略)时,系统可能不会创建一个新的线程,而是允许调度器把doAsyncWork运行于请求doAsyncWork结果的线程中(对fut调用了get或wait的线程),而如果是你自己实现这种策略(将doAsyncWork运行于请求结果的线程中),有可能会导致负载均衡问题,因为std::async和运行时调度器来代替你面对这些问题时,运行调度器很可能对于当前机器上正在发生什么比你有更全面的了解。

    即使你确实有更紧急的事需要处理,从而需要doAsyncWork确实运行于异步环境中时,也可以使用std::lanuch::async调用std::async,保证函数确实运行于另一个线程中。

             最高水平的线程调度器会使用全系统范围的线程池来避免超订,而且还会通过工作窃取算法来提高硬件内核间的负载均衡。C++标准库并未要求一定使用线程池或者工作窃取算法,但是一些厂商还是会在他们的标准库实现中利用该技术,并且使我们有理由期待这一领域会持续进步,如果使用基于任务的方法进行编程,则在相关技术普及之时你会自动享受这些好处,如果直接使用了std::thread,则要自行承担处理线程耗尽、超订和负载均衡的重担,更不用提你在程序中的解决方案能否应用在同一台机器上的另一个进程上,而是使问题更加雪上加霜。

             但是仍有些场景下,直接使用std::thread会更合适,它们包括:需要访问底层线程实现的API;需要且有能力为你的应用优化线程用法;需要实现C++并行API未提供的线程技术,比如线程池技术。然而这些都是不常见的场景,大多数时候,你应该选择基于任务的方法。

    36:如果异步是必要的,则指定std::launch::async

             std::async的启动策略有两种:

    std::launch::async,以异步的方式执行函数f,也就是在另一个线程上执行;

    std::launch::deferred,f只会在std::async所返回的期值的get或wait调用时才运行,也就是执行会推迟到其中一个调用发生的时刻,当调用get或wait时,f会同步执行,即调用方会阻塞到f运行至结束为止,如果get或wait没有调用,则f不会运行。

             std::async的默认启动策略,是二者或运算的结果,因此下面的调用时等价的:

    auto fut1 = std::async(f); // run f using default launch policy
    
    auto fut2 = std::async(std::launch::async | 
                        std::launch::deferred, f); // run f either async or deferred

              这么一来,默认启动策略允许f以异步或同步的方式运行皆可。正如上一条款所说,这种弹性使得std::async与标准库的线程管理组件能够承担起线程的创建和销毁、避免超订,以及负载均衡的责任。

             但是std::async的默认启动策略也会有一些不易察觉的问题。比如在线程t中执行:

    auto fut = std::async(f); // run f using default launch policy

              无法预知f是否回会和t并发运行,因为f可能会被调度为推迟运行;

             无法预知f是否运行在与调用fut的get或wait函数的线程不同的某线程之上。如果t是那个线程,也就是说无法预知f是否会运行在与t不同的某线程之上。

             连f是否会运行这件起码的事都无法预知,因为无法保证程序的每条路径上,fut的get或wait都会得到调用。

            

    这种默认启动策略在调度上的弹性,意味着如果f读/写线程的局部存储(thread local storage,TLS),无法预知操作的是哪个线程的局部存储,f操作的TLS可能是和独立线程相关,也可能是和调用fut的get或wait的线程相关;

             这种策略也会影响那些基于fut.wait的循环中以超时为条件的代码,因为延迟策略下调用fut的wait_for或wait_until,直到f执行完,它们的结果一直是std::launch::deferred,这意味着下面的循环可能会停止,也可能永远无法停止:

    using namespace std::literals; 
    
    void f() {
        std::this_thread::sleep_for(1s);
    }
    
    auto fut = std::async(f); 
    // loop until f has finished running...
    while (fut.wait_for(100ms) != std::future_status::ready)
    { … }

              如果f与调用std::async的线程是并发执行的(即选用了std::launch::async启动策略),这就没问题。但是如果f是被推迟执行的,则fut.wait_for将总是返回std::future_status::deferred,导致循环无法停止。这种情况一般只会在运行负载很重时才会出现,毕竟如果硬件层没有面临超订或者线程耗尽的威胁,运行期系统没有理由不去调度任务以并发方式执行,这就使得这种问题难以复现。

             修正这个错误的做法是首先用wait_for(0s)探测任务是否确实以延迟策略运行:

    auto fut = std::async(f); 
    if (fut.wait_for(0s) == std::future_status::deferred) //if task is deferred...
    {
        … // use wait or get on fut to call f synchronously
    } 
    else 
    { // task isn't deferred
        while (fut.wait_for(100ms) != std::future_status::ready) { 
            … // task is neither deferred nor ready, so do concurrent work until it's ready
        }
        … // fut is ready
    }

              将上面所有因素纳入考量的最终结论是:默认启动策略对任务使用std::async能正常工作需要满足下面所有条件:任务不需要与调用get或wait的线程并发执行;读写哪个线程的thread_local变量没有影响;要么保证在std::async返回的期值上会调用get或wait,要么接受任务可能永远不执行的情况;使用wait_for或wait_until的代码将任务被推迟的可能性纳入考虑。

             只要其中一个条件不满足,你就很可能需要保证任务以异步方式执行,从而需要指定std::launch::async作为第一个实参调用std::async。如果确实是这样,则能有个函数像std::async那样运作在方便不过了。下面就是C++11的版本:

    template<typename F, typename... Ts>
    inline std::future<typename std::result_of<F(Ts...)>::type> // return future
    reallyAsync(F&& f, Ts&&... params) {
        return std::async(std::launch::async, std::forward<F>(f), std::forward<Ts>(params)...);
    }
    
    auto fut = reallyAsync(f); // run f asynchronously;

              上面获取std::async的返回值类型,使用了type_traits中的result_of。在C++14中,可以有更简化的版本:

    template<typename F, typename... Ts>
    inline
    auto reallyAsync(F&& f, Ts&&... params) {
        return std::async(std::launch::async, std::forward<F>(f),
                            std::forward<Ts>(params)...);
    }

      

     

    37:使std::threads在所有路径上都是unjoinable

             每个std::thread对象都处于joinable或unjoinable两种状态之一。unjoinable状态的std::thread包括:默认构造的std::thread,它还没有可运行的函数,因此也就没有对应的底层线程;已经移动了的std::thread,移动的结果就是一个std::thread关联的底层线程被关联到其他std::thread上了;已经join的std::thread,join之后,底层线程已经结束运行,因而std::thread也就没有了底层关联线程;已经detach的std::thread,detach之后,std::thread与底层线程之间的关联被断开。

             如果joinable状态的std::thread的析构函数被调用,则进程会被终止,比如下面的代码:

    constexpr auto tenMillion = 10000000; 
    
    bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
        std::vector<int> goodVals; 
        std::thread t([&filter, maxVal, &goodVals]
        {
            for (auto i = 0; i <= maxVal; ++i)
            { if (filter(i)) goodVals.push_back(i); }
        });
        
        auto nh = t.native_handle(); // use t's native handle to set t's priority
    if (conditionsAreSatisfied()) {
            t.join(); 
            performComputation(goodVals);
            return true; 
        } 
        return false; 
    } 

              首先说明一下,tenMillion的初始值在C++14中可以更具可读性, C++14支持单引号作为数字分隔符:

    constexpr auto tenMillion = 10'000'000; // C++14

              上面的代码中,如果conditionAreSatisfied返回true,则一切正常,但是一旦它返回false或者抛出了异常,那么在doWork的末尾调用std::thread对象t的析构函数时,它处于joinable状态,从而导致程序执行终止。

             之所以要结束进程,是因为另外的两种选项更加糟糕:

             隐式join,这种情况下,std::thread的析构函数会等待底层线程执行完成。这样做的话有可能会导致效率问题,比如如果conditionAreSatisfied早已返回false了,doWork却还需要在调用std::thread析构函数时等待线程的终止而无法返回,这是不合适的;

             隐式detach,这种问题更大,比如在doWork中,goodVals是个通过引用捕获的局部变量,在std::thread的底层线程执行lambda时,还会修改它的值。如果conditionAreSatisfied返回了false,doWork直接返回,导致局部变量goodVals被销毁,然而底层线程此时还在运行,lambda表达式甚至还会修改原goodVals的值,然而此时goodVals早已被销毁,它的位置上可能已经是其他函数的栈帧了,这就导致了难以调试的bug。

             因此,如果你使用了std::thread,就必须保证从它的定义域出去的任何路径,它都是unjoinable的。但是覆盖所有路径是不容易的,实际上这就跟保证所有出去的路径上资源得到释放是一样的道理,于是就需要有RAII的设计。因为在std::thread的析构上采取join或detach都有可能造成问题,因此标准库没有提供这样的RAII类,然而使用std::thread的程序员知道当析构时是join合适,还是detach更合适,因此可以自己写一个:

    class ThreadRAII {
    public:
        enum class DtorAction { join, detach }; 
    
        ThreadRAII(std::thread&& t, DtorAction a) 
            : action(a), t(std::move(t)) {}
        
        ~ThreadRAII()
        {
            if (t.joinable()) { 
                if (action == DtorAction::join) {
                    t.join();
                } else {
                    t.detach();
                }
            }
        }
        std::thread& get() { return t; } 
    private:
        DtorAction action;
        std::thread t;
    };

              因为std::thread不可复制,因此ThreadRAII的构造函数使用右值引用,将std::thread移入ThreadRAII;

    声明成员变量时,将std::thread声明为最后一个,因为std::thread初始化之后可能会立即运行,所以把他们放在类的最后是个好习惯,保证了std::thread类型对象构造时,所有它前面的成员变量已经完成了初始化,因为std::thread执行的底层线程可以安全的访问这些成员变量;

    ThreadRAII还提供了get函数,用于访问std::thread对象,这类似于标准智能指针提供的get函数;

             ThreadRAII的析构函数在调用std::thread类型对象t的成员函数之前,需要先校验t是否是joinable的,因为针对一个unjoinable的线程调用join或detach是未定义行为。用户可能针对ThreadRAII调用get得到std::thread,然后针对该std::thread进行移动或join、detach操作,导致其是unjoinable的。

             使用ThreadRAII的代码如下:

    bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion)
    {
        std::vector<int> goodVals;
        ThreadRAII t( 
            std::thread([&filter, maxVal, &goodVals]
            {
                for (auto i = 0; i <= maxVal; ++i)
                { if (filter(i)) goodVals.push_back(i); }
            }),
            ThreadRAII::DtorAction::join 
        );
        auto nh = t.get().native_handle();
        …
        if (conditionsAreSatisfied()) {
            t.get().join();
            performComputation(goodVals);
            return true;
        }
        return false;
    }

              这个例子中,在ThreadRAII的析构函数中,对线程调用的是join操作,这是因为调用detach操作产生的结果更加恶劣,而join导致的性能问题就显得不那么糟糕了。然而,在条款39中会提到,ThreadRAII的析构中调用join不仅仅会造成性能问题,还可能导致程序失去响应。因此这种问题合适的解决方案是与异步执行的线程进行通信,当不再需要它时使其能及时结束,但是C++11中不支持中断线程的操作,我们可以手动实现它,不过这已经超出了本书的范围。

             条款17提到过,因为ThreadRAII声明了析构函数,所以编译器不会为其生成移动操作,但是这里ThreadRAII没有理由实现为不可移动的,因此可以显示的声明移动操作:

    class ThreadRAII {
    public:
        enum class DtorAction { join, detach }; 
        ThreadRAII(std::thread&& t, DtorAction a)
            : action(a), t(std::move(t)) {}
        ~ThreadRAII() {
            … // as before
        }
        ThreadRAII(ThreadRAII&&) = default; // support
        ThreadRAII& operator=(ThreadRAII&&) = default; // moving
        std::thread& get() { return t; } 
    private: 
        DtorAction action;
        std::thread t;
    };

      

     

    38:对变化多端的线程句柄析构函数保持关注

             joinable状态的std::thread关联着底层线程,实际上未推迟的std::async返回的std::future与系统线程也有类似的关系,因此std::thread对象和std::future对象都可以视作底层线程的句柄。

             然而,std::thread和期值(std::future或std::shared_future)的析构函数表现出的行为是不一样的。上一条款提到过,针对joinable的std::thread的析构会造成程序终止;而对期值的析构函数却不会,这就需要我们仔细考虑一下为什么会有这样的区别。

             期值位于信道的一端,被调方(通常以异步方式运行)把其计算结果写入信道(通常经过一个std::promise对象),而调用方则通过std::future来读取该结果:

     

             但是被调方的结果要存储在哪里呢?答案是存储在共享状态(shared state)中,共享状态通常使用堆上的对象来表示。共享状态与调用方和被调方之间的关系如下图:

     

             期值析构函数的行为,与其关联的共享状态相关,具体而言:

             std::async启动的未推迟任务返回的std::future,如果它是关联到共享状态的最后一个std::future(或std::shared_future),则其析构函数会一直阻塞,直到任务结束,也就是该期值的析构函数对底层异步执行任务的线程要实施join;其他所有期值对象的析构函数仅仅就是将期值析构而已。

             这种规则,实际上就是一个平凡的常规行为,外加一个例外规则而已。常规行为就是期值的析构仅仅是析构期值而已(实际上它还对其关联的共享状态中的引用计数进行了一次自减)。而那个例外规则,只有在期值满足下面所有条件时才会发挥作用:

             期值所指向的共享状态是由于调用了std::async才创建的;任务的启动策略是std::launch::async(可能是系统自行选择,也可以是指定);期值是指向共享状态的最后一个期值,针对std::future而言这总是成立的,而对于std::shared_future类型的对象而言,如果其不是最后一个指向共享状态的期值,它会遵循常规行为。

             之所以要定义这么一条例外规则,是因为标准委员会想要避免detach相关的问题,又不想简单的结束进程,所以采取了隐式join的做法。

     

             期值的API没有提供任何方法判断其指向的共享状态是否诞生于std::async的调用,所以对于给定的期值对象,不可能知道是否会在析构中阻塞到异步任务结束:

    // this container might block in its dtor, because one or more
    // contained futures could refer to a shared state for a non-
    // deferred task launched via std::async
    std::vector<std::future<void>> futs;
    
    // Widget objects might block in their dtors
    class Widget { 
    public:
        …
    private:
        std::shared_future<double> fut;
    };

             

             除了std::async会返回期值之外,std::packagd_task也可以得到期值,比如下面的代码:

    { 
        std::packaged_task<int()> pt(calcValue);
        auto fut = pt.get_future();
        std::thread t(std::move(pt));
        … 
    } 

              std::packaged_task对象一旦创建,一般就会运行在线程上(虽然也可以使用std::async,但是这种情况下就没必要使用std::packaged_task了)。std::packaged_task不能复制,只能移动,所以放入std::thread时需要std::move;

             在”...”中,针对t可能的动作是:不join也不detach,这将导致离开作用域时进程被终止;实施join或detach,此种情况下,fut的析构中自然没必要join或detach了;因此,如果期值是由std::package_task产生的,无需采取例外规则,这实际上也解释了为什么要对std::async返回的期值指定那样的例外规则的原因了。

    39:考虑针对一次性事件通信使用以void为模板类型实参的期值

             如果需要让一个任务能通知另一个以异步执行的任务发生了某特定事件,一种常规的做法是使用条件变量和一个事件状态标志位。检测任务设置标志位并在条件变量上调用notify,反应任务等待条件变量并检测标志位:

    std::condition_variable cv; 
    std::mutex m;
    bool flag(false); // not std::atomic
    
    //detect thread// detect event
    {
        std::lock_guard<std::mutex> g(m); // lock m via g's ctor
        flag = true; // tell reacting task(part 1)
    }
    cv.notify_one(); // tell reacting task(part 2)
    
    
    //react thread// prepare to react
    {
        std::unique_lock<std::mutex> lk(m); 
        cv.wait(lk, [] { return flag; }); 
        … // react to event(m is locked)
    }
    … // continue reacting(m now unlocked)

             

             实际上,还有另外一种不使用条件变量、互斥锁以及标志位的方法。利用期值和std::promise。检测任务有一个std::promise对象(信道的写入端),而反应任务有对应的期值(信道的读端),当检测任务检测到事件已经发生时,设置std::promise对象,而反应任务调用期值的wait以等待事件的发生。

             std::promise和期值都是模板,模板形参表示要通过信道发送数据的类型,然而这种场景下没有要发送的数据,这种情况下可以使用std::promise和期值模板中形参为void的特化。所以给定std::promise<void> p,检测任务和反应任务的代码都很简单:

    //detect thread// detect event
    p.set_value(); // tell reacting task
    
    //react thread// prepare to react
    p.get_future().wait(); // wait on future// react to event

              检测任务是否在反应任务等待之间就设置std::promise毫无问题。然而这种方法也不是全无问题,比如std::promise和期值之间是共享状态,而共享状态通常是堆上的对象,因此这种设计就会导致堆上进行分配和回收的成本;而且还有一点,std::promise对象只能设置一次,std::promise和std::future之间的通信息到是一次性机制,不能重复使用,而基于条件变量的设计可以进行多次通信。

             当然如果只是一次性事件使用std::promise和期值是没有问题的,比如:

    std::promise<void> p;
    void react(); // func for reacting task
    
    void detect() {
        std::thread t([] 
        {
            p.get_future().wait(); 
            react(); 
        });
        … // here, t is suspended
    
        p.set_value(); // unsuspend t (and thus call react)// do additional work
        t.join(); 
    } 

              这里在t离开作用域之前使用join了,结合条款36,你认为可以使用ThreadRAII:

    void detect() {
        ThreadRAII tr( std::thread([]
                        {
                            p.get_future().wait();
                            react();
                        }),
                    ThreadRAII::DtorAction::join
        );
        … // thread inside tr is suspended here
        p.set_value(); // unsuspend thread inside tr
        …
    }

              然而这段代码是有问题的,在于第一个”...”区域,如果该区域内抛出异常的话,set_value就永远不会调用,因而lambda内的wait会永远阻塞,这表示该线程永远不会结束,而ThreadRAII的析构时进行的join也就会永远阻塞,这就造成了死锁。

            

             如果有多个反应任务,则可以使用std::shared_future:

    std::promise<void> p; 
    
    void detect() {
        auto sf = p.get_future().share();
        std::vector<std::thread> vt; 
        
        for (int i = 0; i < threadsToRun; ++i) {
            vt.emplace_back([sf]{ sf.wait(); react(); }); 
        } 
        … 
        p.set_value(); 
        …
        for (auto& t : vt) {
            t.join();
        }
    }

      

    40:并发使用std::atomic,特殊内存使用volatile

             volatile与并发程序设计毫无关系,但是在其他程序语言(如C#和java)中,它对并发程序设计是有用处的。

             程序员有时会把volatile和std::atomic混淆。std::atomic的操作可以保证被其他线程视为原子的,一旦构造了一个std::atomic对象,针对它的操作就好像这些操作处于受互斥量保护的临界区域一样,但是实际上这些操作通常会使用特殊的机器指令来实现,从而要比使用互斥量更加高效。比如下面的代码:

    std::atomic<int> ai(0); // initialize ai to 0
    ai = 10; // atomically set ai to 10
    std::cout << ai; // atomically read ai's value
    ++ai; // atomically increment ai to 11
    --ai; // atomically decrement ai to 10

              这些语句执行期间,其他读取ai的线程只会看到它的取值为0,10或11,而不可能会有其他的取值。特别是最后两个语句(自增和自减),这两个都是读取-修改-写入(read-modify-write,RMW)操作,但皆以原子方式执行,这是std::atomic最棒的特性之一,一旦构造出std::atomic对象,其上所有的成员函数都保证被其他线程视为原子的。

             但是volatile并不保证这一点:

    volatile int vi(0); // initialize vi to 0
    vi = 10; // set vi to 10
    std::cout << vi; // read vi's value
    ++vi; // increment vi to 11
    --vi; // decrement vi to 10

              以上的语句执行期间,如果有其他线程读取vi的值,则它能看到任何值,-12,68,4090727等任何值。这样的代码会出现未定义行为,因为这些语句修改了vi,而其他线程正读取vi,这就造成了data race。

             考虑下面两个线程同时操作volatile和std::atomic的代码:

    std::atomic<int> ac(0); // "atomic counter"
    volatile int vc(0); // "volatile counter"
    /*----- Thread 1 ----- */  /*------- Thread 2 ------- */
    ++ac;                            ++ac;
    ++vc;                            ++vc;

              当两个线程都完成后,ac的值必定是2,而vc的值则不一定,因为它的自增不是原子的。每次自增包括读取vc的值、修改vc的值,写回vc的值,所以两个线程可能出现的顺序是:线程1读取vc的值,为0;线程2读取vc的值,也为0;线程1把读取的值加1,并将值写回vc;线程2把读取的值加1,并将值写回vc。

             因此,vc的最终值是1。但这不是唯一的结果,vc的最终取值可能是无法预测的,因为vc涉及data race,而标准裁定data race会导致未定义行为,意味着编译器可能会生成代码来做任何事情。

             RMW操作并不是唯一让并发下std::atomic成功而volatile失败的情况,下面的代码:

    std::atomic<bool> valAvailable(false);
    
    auto imptValue = computeImportantValue(); // compute value
    valAvailable = true; // tell other task it's available

              上面的代码首先计算imptValue,然后设置valAvailable为true,表示其他任务可以读取imptValue的值了。

             我们读代码时,会知道valAvailable在赋值之前会为imptValue赋值,这是程序正确运行的基础。但是编译器看到的,不过是针对两个独立变量实施的赋值操作。编译器有时可以将这些不相关的赋值重排序。也就是说:“a = b; x = y;”可能被编译器重排为“x = y; a = b;”。即使编译器没有重排序,底层硬件也可能会这么做,因为这样做有时会让代码运行更快。

             然而,std::atomic对象的使用,会对代码重排序施加限制,限制之一就是,源码中不能将任何std::atomic写操作之前的代码重排序到它的后面(这一点仅在std::atomic对象采用顺序一致性时才成立,这种一致性是默认采用的。C++还支持其他在代码重排序方面更加灵活的一致性模型,这样的模型能使代码在某些硬件体系结构上运行的更快,但是这样的模型所产生的软件要保证正确性、可理解性和可维护性会困难的多。因此,但凡有可能,应该使用这种默认的顺序一致性),因此:

    auto imptValue = computeImportantValue(); // compute value
    valAvailable = true; // tell other task it's available

              不仅编译器必须保证imptValue和valAvailable的赋值顺序,它们还必须生成代码以保证底层硬件也是这个顺序。

             但是如果将valAvailable加上volatile饰词,则不会给代码施加重排序方面的约束:

    volatile bool valAvailable(false);
    
    auto imptValue = computeImportantValue();
    valAvailable = true; // other threads might see this assignment before the one to imptValue!

              这里,编译器可能会将赋值顺序翻转为先valAvailable后imptValue,即使它不这么做,也不会生成机器代码阻止底层硬件这么做。

             上面的两个问题(volatile无法保证原子性和代码顺序)解释了volatile在并发环境下是没有用处的。但是volatile的用处到底是什么呢,它被用于涉及特殊内存的场景中。

             常规内存的特征是,如果你向某个内存写入了值,该值会一直保留在那里,直到它被再次写入为止。所以如果有个常规的int变量x,看到下面的代码,编译器可以通过消除第二个y的赋值操作来优化代码,因为它和y的初始化是冗余的:

    auto y = x; // read x
    y = x; // read x again

              常规内存还有一个特征:如果某内存写入了某值,且期间没有读取该内存,然后再次写入该内存,则第一次写入可以被优化掉。下面的代码,编译器就可以优化掉第一个赋值语句:

    x = 10; // write x
    x = 20; // write x again

              因此下面的代码:

    auto y = x; // read x
    y = x; // read x again
    x = 10; // write x
    x = 20; // write x again

              编译器可能优化为:

    auto y = x; // read x
    x = 20; // write x

             你可能觉得正常程序员不会写这样冗余的代码,确实不太可能(然而也不能保证),但是即使编译器接收的是看上去合理的源码,对其执行模板实例化、内联以及各种常见的重排序等优化后,结果中包含编译器能够消除的冗余加载和废弃存储的情况并不罕见。

             这种优化对于常规内存行为是合法的,但是对于特殊内存却不一定。比如常见的特殊内存是用于内存映射IO的内存,这种内存的位置实际上用于与外部设备(比如外部传感器,显示器、打印机或网络端口)通信,而非用于读写常规内存(RAM)。因此,下面的冗余代码:

    auto y = x; // read x
    y = x; // read x again

              如果x对应于温度传感器报告的值,则x的第二次读取操作并非多余,因为第二次读取时,温度可能已经改变。又比如下面的代码:

    x = 10; // write x
    x = 20; // write x again

              如果x对应于无线电发射器的控制端口,则有可能是代码在向无线电发出命令,并且值10和20对应于不同的命令,如果优化掉第一个赋值,将改变发送的无线电命令序列。

             这就是volatile的使用场景,它告诉编译器,正在处理的是特殊内存,不要对此内存上的操作做任何优化。因此如果x对应特殊内存,则应该加上volatile:“volatile int x;”,这么一来:

    auto y = x; // read x
    y = x; // read x again (can't be optimized away)
    x = 10; // write x (can't be optimized away)
    x = 20; // write x again

             

             如果x被声明为std::atomic:

    std::atomic<int> x;
    auto y = x; // conceptually read x (see below)
    y = x; // conceptually read x again (see below)
    x = 10; // write x
    x = 20; // write x again

              编译器有可能会优化为:

    auto y = x; // conceptually read x (see below)
    x = 20; // write x

              这对于特殊内存而言是无法接受的。并且,x是个std::atomic导致下面的代码无法编译:

    auto y = x; // error!
    y = x; // error!

              因为std::atomic不支持复制操作。如果支持复制初始化的话,由于x是std::atomic对象,而y也会被推导为std::atomic,因此他们的操作都是原子的。但是为了使从x构造y的操作也是原子的,编译器就必须生成代码来在单一的原子操作中读取x并写入y,但是硬件通常无法完成这样的操作,所以std::atomic不支持复制初始化,相同的原因也不支持复制赋值。由于移动操作没有在std::atomic中显示声明,根据条款17中描述的规则,std::atomic也不提供移动构造函数和移动赋值。

             从x中取值并写入y是可以实现的,需要使用std::atomic的load和store成员函数:

    std::atomic<int> y(x.load()); // read x
    y.store(x.load()); // read x again

              虽然上面的代码能通过编译,但上面的两种操作没有一条可以视为单一的原子操作。而且上面的代码,编译器可以优化为将x的值存储在寄存器中,而不是两次从内存中读取:

    register = x.load(); // read x into register
    std::atomic<int> y(register); // init y with register value
    y.store(register); // store register value into y

              如你所见,x只读取了一次,这是处理特殊内存时必须避免的优化措施。

  • 相关阅读:
    【每天学习一点点】Tensorflow 版本与CUDA版本
    【每天学习一点点】使用plot_model绘制网络模式失败
    【每天学习一点点】keras cifar10.load_data()自己下载数据
    【每天学习一点点】Tensorflow2.X 运行问题:Could not create cudnn handle: CUDNN_STATUS_ALLOC_FAILED
    【每天学习一点点】mxnet 版本运行失败问题
    【每天学习一点点】Tensorflow GPU与CPU版本
    【每天学习一点点】不再显示I信息(Tensorflow GPU)
    C# Dynamic特性
    豆瓣,你的前端开发有点幽默了
    配置SHH集群
  • 原文地址:https://www.cnblogs.com/gqtcgq/p/9991822.html
Copyright © 2011-2022 走看看