zoukankan      html  css  js  c++  java
  • C++并发教程-第三节:高级锁和条件变量

    在上一篇文章中,我们学习了如何使用互斥量去修复并发问题。在这篇文章中,我们会继续去研究关于互斥量一些更高阶的技术。我们也会学习其他C++11的并发技术:条件变量。

    递归锁

    让我们想象你有如下一个简单的类:

    struct Complex {
        std::mutex mutex;
        int i;
    
        Complex() : i(0) {}
    
        void mul(int x){
            std::lock_guard<std::mutex> lock(mutex);
            i *= x;
        }
    
        void div(int x){
            std::lock_guard<std::mutex> lock(mutex);
            i /= x;
        }
    };
    

    这时你想去增加一个可以正确执行所有操作的新的操作,因此你可以写一个新的函数:

    void both(int x, int y){
        std::lock_guard<std::mutex> lock(mutex);
        mul(x);
        div(y);
    }
    

    现在,来测试一下这个函数:

    int main(){
        Complex complex;
        complex.both(32, 23);
    
        return 0;
    }
    

    如果你运行了这个程序,你会看到这个程序永远也不会结束。问题非常简单,在both函数中,这个线程需要锁并且访问了mul函数。在这个函数中,线程尝试再一次获取锁。这就导致了死锁的出现。默认情况下,一个线程不能获取同一个互斥量两次。
    有一个简单的解决方案:std::recursive_mutex。这种互斥量可以被同一个线程获取多次。这里是一个正确Complex结构体的版本:

    struct Complex {
        std::recursive_mutex mutex;
        int i;
    
        Complex() : i(0) {}
    
        void mul(int x){
            std::lock_guard<std::recursive_mutex> lock(mutex);
            i *= x;
        }
    
        void div(int x){
            std::lock_guard<std::recursive_mutex> lock(mutex);
            i /= x;
        }
    
        void both(int x, int y){
            std::lock_guard<std::recursive_mutex> lock(mutex);
            mul(x);
            div(y);
        }
    };
    

    这一次,程序运行正确了。

    时间锁

    有些时候,你不想让一个线程去无限地等待一个互斥量。举个例子,你想让一个线程要么做一些事情,要么等待(互斥量解锁)。为了这个目标,标准库有一种解决方案:std::timed_mutex和std::recursive_timed_mutex(如果你需要互斥量的递归特性)。你一样可以通过std::mutex:lock和unlock去获取互斥量的访问权限,但是你也有两个新的函数可以去做这件事情:try_lock_for和try_lock_util。
    其中第一个函数也是最有用的。它允许你去设置一个超时放弃,此时函数自动返回,即使是它没有获取到互斥量。当这个函数获得锁时,它会返回true,否则是false。让我们在下面这个简单的程序中试一下吧:

    std::timed_mutex mutex;
    
    void work(){
        std::chrono::milliseconds timeout(100);
    
        while(true){
            if(mutex.try_lock_for(timeout)){
                std::cout << std::this_thread::get_id() << ": do work with the mutex" << std::endl;
    
                std::chrono::milliseconds sleepDuration(250);
                std::this_thread::sleep_for(sleepDuration);
    
                mutex.unlock();
    
                std::this_thread::sleep_for(sleepDuration);
            } else {
                std::cout << std::this_thread::get_id() << ": do work without mutex" << std::endl;
    
                std::chrono::milliseconds sleepDuration(100);
                std::this_thread::sleep_for(sleepDuration);
            }
        }
    }
    
    int main(){
        std::thread t1(work);
        std::thread t2(work);
    
        t1.join();
        t2.join();
    
        return 0;
    }
    

    (这个示例在真实的实践中完全可用)
    在这个示例中,首先一件有趣的事情是声明std::chrono::milliseconds的持续时间,这也是一个C++11标准的一个新特征。你可以获取到多种时间类型:纳秒、微秒、毫秒、秒、分钟、小时。我们将timeout作为try_lock_for函数的变量输入。通过std::this_thread::sleep_for(duration),我们也用它去做线程睡眠。这段示例剩下的部分没有什么可说的,仅仅是一些可视化地查看一些输出结果。注意这段代码永远不会停止,你必须手动杀死它。

    一次调用

    有时候你会想要一个函数只被调用一次,无论有多少个线程在使用它。想象一个函数有两个部分:第一部分只会被调用一次,剩下的部分则会在函数调用时被多次执行。我们可以使用std::call_once函数去非常简单地修复这个问题。下面是一个利用这个机制的示例:

    std::once_flag flag;
    
    void do_something(){
        std::call_once(flag, [](){std::cout << "Called once" << std::endl;});
    
        std::cout << "Called each time" << std::endl;
    }
    
    int main(){
        std::thread t1(do_something);
        std::thread t2(do_something);
        std::thread t3(do_something);
        std::thread t4(do_something);
    
        t1.join();
        t2.join();
        t3.join();
        t4.join();
    
        return 0;
    }
    

    每一个std::call_once有一个对应的std::once_flag变量。在这里,我们设置了一个只会被调用一次的闭包,但一个函数指针或std::function也能实现这一点。

    环境变量

    一个环境变量会管理一个等待中的线程列表,直到其他线程唤醒它们。每一个想去等待环境变量的线程必须首先要获取一个锁。当线程开始在等待环境变量时,这个锁会在之后被释放;并且当线程被唤醒时,这个锁被获取。
    一个很好的示例是并行的有界缓冲区。这是一个有开始和结束标志的确定容量的循环缓冲。下面是我们利用环境变量的实现:

    struct BoundedBuffer {
        int* buffer;
        int capacity;
    
        int front;
        int rear;
        int count;
    
        std::mutex lock;
    
        std::condition_variable not_full;
        std::condition_variable not_empty;
    
        BoundedBuffer(int capacity) : capacity(capacity), front(0), rear(0), count(0) {
            buffer = new int[capacity];
        }
    
        ~BoundedBuffer(){
            delete[] buffer;
        }
    
        void deposit(int data){
            std::unique_lock<std::mutex> l(lock);
    
            not_full.wait(l, [this](){return count != capacity; });
    
            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            ++count;
    
            l.unlock();
            not_empty.notify_one();
        }
    
        int fetch(){
            std::unique_lock<std::mutex> l(lock);
    
            not_empty.wait(l, [this](){return count != 0; });
    
            int result = buffer[front];
            front = (front + 1) % capacity;
            --count;
    
            l.unlock();
            not_full.notify_one();
    
            return result;
        }
    };
    

    这个互斥量通过std::unique_lock进行管理。它是一个管理这个锁的装饰器。使用环境变量是很有必要的,为了去唤醒一个正在等待环境变量的线程,notify_one函数被使用。在notify_one 之前的解锁并不是完全必要的。如果你忽视了它,它会被unique_lock的析构函数自动结束。但是,这是有可能通过调用notify_one唤醒一个等待的线程,这个线程会直接被再一次阻塞,因为锁仍然没有被唤醒线程解开。因此,如果你在那之前做这件事情,唤醒线程会直接获得锁。因此,这是一个细微的优化,但它不会做出很大的改进。这个等待函数是有一点特殊。它让第一个参数作为一个独立的锁,而第二个参数则是一个断言。当等待继续的时候,这个断言必须返回FALSE(等同于while(!pred()){cv.wait(l);}),这个函数剩下的部分没什么可说的。
    我们可以使用这个结构体去解决多个生产者消费者问题。这个是一个在并行编程中非常通用的问题。多个线程(消费者)等待消费从其他多个线程(生产者)生产的数据。这儿是一个使用这个结构体的多线程例子:

    void consumer(int id, BoundedBuffer& buffer){
        for(int i = 0; i < 50; ++i){
            int value = buffer.fetch();
            std::cout << "Consumer " << id << " fetched " << value << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(250));
        }
    }
    
    void producer(int id, BoundedBuffer& buffer){
        for(int i = 0; i < 75; ++i){
            buffer.deposit(i);
            std::cout << "Produced " << id << " produced " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }
    
    int main(){
        BoundedBuffer buffer(200);
    
        std::thread c1(consumer, 0, std::ref(buffer));
        std::thread c2(consumer, 1, std::ref(buffer));
        std::thread c3(consumer, 2, std::ref(buffer));
        std::thread p1(producer, 0, std::ref(buffer));
        std::thread p2(producer, 1, std::ref(buffer));
    
        c1.join();
        c2.join();
        c3.join();
        p1.join();
        p2.join();
    
        return 0;
    }
    

    三个消费线程和两个生产线程不断地被创建和查询这个结构体。一个有趣的事情在于这个示例在使用std::ref,以引用的方式传输缓存,这对于避免缓存的复制是很有必要的。

    圆满完成

    这篇文章中,我们讨论了多个事情。首先我们学习了如何使用recursive_mutex去允许一个线程获取另一个线程超过一次。之后,我们学习了如何去用超时的方式获取一个互斥量。然后,我们学习了只访问一个函数一次的方法。最后,利用环境变量解决了多个生产者/消费者问题。
    本文示例的源代码在这里

  • 相关阅读:
    HDU 6034
    HDU 6047
    CodeForces 830B
    HDU 4972
    HDU 4408
    CodeForces 788B
    CodeForces 788A
    CodeForces 792C
    uva 1658 Admiral 最小费最大流
    hdu 5391 Zball in Tina Town 威尔逊定理
  • 原文地址:https://www.cnblogs.com/wickedpriest/p/14452582.html
Copyright © 2011-2022 走看看