zoukankan      html  css  js  c++  java
  • c++ 多线程笔记3

    本文将学习C++11并发库中的: 条件变量(condition varibles)。

    参考来源: 

    https://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html

    https://www.cnblogs.com/haippy/p/3252041.html

    http://www.cplusplus.com/reference/condition_variable/condition_variable/wait/

    1. Recursive locking 

    如果我们有以下这样一个类:

    struct Complex {
        std::mutex mutex;
        int i;
    
        Complex() : i(0) {}
    
        void mul(int x){
            std::lock_guard<std::mutex> lock(mutex);
            i *= x;
        }
    
        void div(int x){
            std::lock_guard<std::mutex> lock(mutex);
            i /= x;
        }
    };

    同时你想有个函数同时执行mul和div函数且不出错,于是加上

    void both(int x, int y){
        std::lock_guard<std::mutex> lock(mutex);
        mul(x);
        div(y);
    }

    然后在我的linux下测试。

    #include <iostream>
    #include <thread>
    #include <mutex>
    
    struct Complex
    {
        std::mutex mutex;
        int i;
    
        Complex() : i(0) {}
    
        void mul(int x)
        {
            std::lock_guard< std::mutex > lock(mutex);
            i *= x;
        }
        void div(int x)
        {
            std::lock_guard <std::mutex > lock(mutex);
            i /= x;
        }
    
        void both(int x, int y)
        {
            std::lock_guard<std::mutex> lock(mutex);
            mul(x);
            div(x);
        }
    };
    
    int main()
    {
        Complex _complex;    
        _complex.both(32,23);
        return 0;
    }

    发现程序始终在运行中,不会终止。这是由于both()函数中,这个线程调用获得锁后并调用了mul函数,然而在mul函数中,该线程再次去获得锁。这个锁此时已被锁住,所以是死锁。默认情况下,一个线程不能多次获得同样的的互斥锁。为解决该问题,引入std::recursive_mutex。

    这个互斥锁能够被同一个线程获得多次。正确版本如下:

    #include <iostream>
    #include <thread>
    #include <mutex>
    
    struct Complex
    {
        std::recursive_mutex mutex;
        int i;
        Complex() : i(0) {}
    
        void mul(int x)
        {
            std::lock_guard<std::recursive_mutex> lock(mutex);
            i *= x;
        }
        
        void div(int x)
        {
            std::lock_guard<std::recursive_mutex> lock(mutex);
        }
    
        void both(int x, int y)
        {
            std::lock_guard<std::recursive_mutex> lock(mutex);
            mul(x);
            div(x);
        }
    };
    
    int main()
    {
        Complex _complex;
        _complex.both(32,23);
        return 0;
    }

    2. Timed locking

    有时候,你不希望一个线程一直等待互斥锁。比如,你的线程可以在等待线程的时候可以做其他事。因此标准库提出一个解决方案:std::timed_mutex 和 std::recursive_timed_mutex。已经接触到了同样的函数:

    std::mutex:lock()和unlock(),不过也该尝试新的函数:try_lock_for(),和try_lock_unit().

    第一个是最有用的,允许你设置一个时限,这个函数即使锁未被得到也能自动返回。

    这个函数将返回true如果这个函数锁已经被得到否则返回false。例子如下:

    std::timed_mutex mutex;
    
    void work(){
        std::chrono::milliseconds timeout(100);
    
        while(true){
    /* try_lock_for()
    *The function returns true if the lock has been acquired, false *otherwise. 
    */
            if(mutex.try_lock_for(timeout)){
                std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;
    
                std::chrono::milliseconds sleepDuration(250);
                std::this_thread::sleep_for(sleepDuration);
    
                mutex.unlock();
    
                std::this_thread::sleep_for(sleepDuration);
            } else {
                std::cout << std::this_thread::get_id() << ": do work without mutex" << std::endl;
    
                std::chrono::milliseconds sleepDuration(100);
                std::this_thread::sleep_for(sleepDuration);
    /*
    *Blocks the execution of the current thread for at least the *specified sleep_duration.
    *This function may block for longer than sleep_duration due to *scheduling or resource contention delays.
    */
            }
        }
    }
    
    int main(){
        std::thread t1(work);
        std::thread t2(work);
    
        t1.join();
        t2.join();
    
        return 0;
    }

    std::chrono::milliseconds,C++11中的新特性,可以使用以下时间单元: nanoseconds, microseconds, milliseconds,seconds,minutes,hours。我们使用这种变量设置try_lock_for的时限。一个线程的睡眠设置 :std::this_thread::sleep_for(duration).

    3. Call once

    当你只想你的函数只被调用一次,即使有多个线程被使用。如果一个函数分为两部分,第一部分只被调用一次,第二部分在函数每次被调用的时候都被执行。我们可以使用std::call_once函数来解决这个问题。

    #include <iostream>
    #include <thread>
    #include <mutex>
    
    std::once_flag flag;
    
    void do_something()
    {
        std::call_once(flag,[](){std::cout << "Called once" << std::endl; });
    
        std::cout << "Called each time" << std::endl;
    }
    
    int main()
    {
        std::thread t1(do_something);
        std::thread t2(do_something);
        std::thread t3(do_something);
        std::thread t4(do_something);
    
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        
        return 0;
    
    }
    /**my output:*****
    Each std::call_once is matched to a std::once_flag variable.
    Here I put a closure to be executed only once, but a function pointer or a std::function will make the trick. *Called once *Called each time *Called each time *Called each time *Called each time
    */

    每个std::call_once 与一个std::once_flag变量匹配。

     4. Condition variables

    一个条件变量管理线程列表的等待,直到另一个线程通知它们。每个线程将在这个条件变量上等待时必须首先获得锁。然后这个线程开始在这条件上等待时,锁将被释放;如果线程被唤醒,锁将重新被获得。例子: concurrent Bounded Buffer.并发有界缓存,这是个有特定容量的循环缓存,有一个开始和一个结束。以下是使用了条件变量的并发有界缓存例子:

    struct BoundedBuffer {
        int* buffer;
        int capacity;
    
        int front;
        int rear;
        int count;
    
        std::mutex lock;
    
        std::condition_variable not_full;
        std::condition_variable not_empty;
    
        BoundedBuffer(int capacity) : capacity(capacity), front(0), rear(0), count(0) {
            buffer = new int[capacity];
        }
    
        ~BoundedBuffer(){
            delete[] buffer;
        }
    
        void deposit(int data){
            std::unique_lock<std::mutex> l(lock);
    
            not_full.wait(l, [this](){return count != capacity; });
    
            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            ++count;
    
            l.unlock();
            not_empty.notify_one();
        }
    
        int fetch(){
            std::unique_lock<std::mutex> l(lock);
    
            not_empty.wait(l, [this](){return count != 0; });
    
            int result = buffer[front];
            front = (front + 1) % capacity;
            --count;
    
            l.unlock();
            not_full.notify_one();
    
            return result;
        }
    };

    std::unique_lock能管理互斥锁,这是个对锁管理的包装器。当使用条件变量时这是必要的。将一个等待条件变量的线程唤醒,需要用到notify_one()。解锁(unlock)在notify_one之前不是完全必要的。如果你忽略(unlock)解锁这个操作,它会在unique_lock的析构函数中自动执行。但是这是可能的,notify()_one调用可以唤醒一个等待的线程,但这个线程将接下来被再次阻塞,因为锁本身会被notifier线程锁定。

    等待函数(not_full.wait(l, [this])) 有点特别,第一个参数unique_lock,第二个参数为断言(predicate)。当等待必须被继续的时候,这个predicate必须返回false。

    std::condition_variable对象的wait对象被调用时,使用std::unique_lock(通过std::mutex)来锁住当前线程。当前线程会一直被阻塞当predicate返回false,直到另外一个线程在相同的 std::condition_variable对象上调用了notification函数来唤醒当前线程并且predicate此时返回true。

    我们可以使用这个结构去修复多个 consumers/producers 问题。这个问题在并发编程中很常见。好几个线程(consumers)等待着其他几个线程(producers)生成的数据。例子:

    #include <iostream>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    struct BoundedBuffer
    {
        int * buffer;
        int capacity;
        int front;
        int rear;
        int count;
        std::mutex lock;
    
        std::condition_variable not_full;
        std::condition_variable not_empty;
    
        BoundedBuffer(int capacity) : capacity(capacity), front(0), rear(0), count(0) 
        {
            buffer = new int[capacity];
        }
        ~BoundedBuffer()
        {
            delete[] buffer;
        }
        
        void deposit(int data)
        {
            std::unique_lock<std::mutex> l(lock);
    
            not_full.wait(l, [this](){return count != capacity; });
    
            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            ++count;
    
            l.unlock();
            not_empty.notify_one();
        }
    
        int fetch()
        {
            std::unique_lock<std::mutex> l(lock);
    /*Each thread that wants to wait on the condition variable hash to acquire a lock first.
     *The lock is then released when the thread starts to wait on the condition.
     *The lock is acquired when the thread is awakened.
    * wait(unique_lock<mutex> &lck, Predicate pred) ,the function only blocks is pred returns false.
    * and notifications can only unblock the thread when it became true.
    */ not_empty.wait(l, [this]() {return count != 0; }); int result = buffer[front]; front = (front + 1) % capacity; --count; l.unlock(); not_full.notify_one(); // To wake up a thread that is waiting on a condition variable. return result; } }; void consumer(int id, BoundedBuffer&buffer) { for(int i = 0; i < 50; i++) { int value = buffer.fetch(); std::cout << "Consumer " << id << " fetched " << value << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(250)); } } void producer(int id, BoundedBuffer& buffer) { for(int i = 0; i < 75; i++) { buffer.deposit(i); std::cout << "Produced " << id << " produced " << i << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } int main() { BoundedBuffer buffer(200); std::thread c1(consumer, 0, std::ref(buffer)); std::thread c2(consumer, 1, std::ref(buffer)); std::thread c3(consumer, 2, std::ref(buffer)); std::thread p1(producer, 0, std::ref(buffer)); std::thread p2(producer, 1, std::ref(buffer)); c1.join(); c2.join(); c3.join(); p1.join(); p2.join(); return 0; }

    5. Wrap-up

    本文讲了如下几个方面:

    1. 如何用recursive_mutex使得线程获得互斥锁多次。

    2. 如何去用获得由时间限制的互斥锁。

    3. 如何仅执行函数一次。

    4.最后,条件变量被使用去解决 multiple consumers/multiple producers problem.

    The Safest Way to Get what you Want is to Try and Deserve What you Want.
  • 相关阅读:
    Oracle 自定义聚合函数
    PL/SQL Developer背景色设置
    Oracle 所有关键字查询视图 V$RESERVED_WORDS
    react创建ts项目
    chrome如何导出已经安装的扩展程序为.crx文件?
    If you would prefer to ignore this check, add SKIP_PREFLIGHT_CHECK=true to an .env file in your
    vue脚手架配置环境变量
    sourceTree系列二:创建分支,并推送到远程上
    sourceTree系列一:合并分支,把uat的代码合并到dev上
    如何在typescript中解决 error TS2451: Cannot redeclare block-scoped variable 'name'
  • 原文地址:https://www.cnblogs.com/Shinered/p/9082044.html
Copyright © 2011-2022 走看看