zoukankan      html  css  js  c++  java
  • c++11 生产者/消费者

    下面是一个生产者消费者问题,来介绍condition_variable的用法。当线程间的共享数据发生变化的时候,可以通过condition_variable来通知其他的线程。消费者wait 直到生产者通知其状态发生改变,Condition_variable是使用方法如下:

    ·当持有锁之后,线程调用wait

    ·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中

    ·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue)

     线程被唤醒之后继续持有锁运行。

    Condition variable有两种类型:condition_variable 和 condition_variable_any,前一种效率更高,但是使用不够灵活,只支持std::unique_lock<std::mutex>类型的互斥锁;后一种比较灵活,支持所有类型的锁,但是效率稍微低一些。

    有一点需要注意的是使用condition variable进行通信的线程,condition variable 需要使用相同的互斥信号量(mutex)。

    下面来看例子:(当按下回车键之后停止)

    复制代码
    复制代码
    #include <thread>
    
    #include <iostream>
    
    #include <mutex>
    
    #include <queue>
    
    #include <condition_variable>
    
    #include <atomic>
    
    using namespace std;
    
    int main()
    {
    
        mutex lockBuffer; //申明互斥信号量
    
        volatile bool ArretDemande = false; //使生产、消费过程的结束
    
        queue<long> buffer;       
    
        condition_variable_any cndNotifierConsommateurs;//condition variable
    
        condition_variable_any cndNotifierProducteur;   
     
        thread ThreadProducteur([&]()//生产者线程
        {
           
            std::atomic<long> interlock;//对interlock的操作将是原子的
    
            interlock=1;   
    
            while(true)
            {               
    
                    std::this_thread::sleep_for (chrono::milliseconds (15));               
    
                    long element=interlock.fetch_add (1);//【1】
    
                    lockBuffer.lock ();
    
                    while(buffer.size()==10 && ArretDemande ==false)
                    {
                       
                        cndNotifierProducteur.wait (lockBuffer);//【2】
    
                    }
    
                    if (ArretDemande==true)
    
                    {                   
    
                        lockBuffer.unlock ();
    
                        cndNotifierConsommateurs.notify_one ();//【3】
    
                        break;
    
                    }
    
                    buffer.push(element);
    
                    cout << "Production unlement :" << element << " size :" << buffer.size() << endl;
    
                    lockBuffer.unlock ();
    
                    cndNotifierConsommateurs.notify_one ();
    
            }
    
        } );
    thread ThreadConsommateur([&]() { while(true) { lockBuffer.lock (); while(buffer.empty () && ArretDemande==false) { cndNotifierConsommateurs.wait(lockBuffer); } if (ArretDemande==true && buffer.empty ()) { lockBuffer.unlock(); cndNotifierProducteur.notify_one (); break; } long element=buffer.front(); buffer.pop (); cout << "Consommation element :" << element << " size :" << buffer.size() << endl; lockBuffer.unlock (); cndNotifierProducteur.notify_one (); } } ); std::cout << "Pour arreter pressez [ENTREZ]" << std::endl; getchar(); std::cout << "Arret demande" << endl ArretDemande=true; ThreadProducteur.join(); ThreadConsommateur.join(); cout<<"Main Thread"<<endl; return 0; }
    复制代码
    复制代码

    运行结果:

    对程序进行一下说明,程序中有三个线程,主线程、生产者线程、消费者线程,三个线程之间乱序执行,通过一些全局变量来控制他们的执行顺序。主线程的作用是控制生产消费过程是否结束,当程序运行之后,主线程通过getchar()接收一个输入,接收到输入后会将ArretDemande设置为true,另外两个线程会终止。生产者线程将生产出来的数据放在一个queue类型的buffer中,并解锁,通知消费之线程,buffer中最多“能”存10个数据,如果buffer中已经有10个数据还没有被取走,则会通知消费者线程“消费”,如果ArretDmande被置位,则打开锁,并通知消费之线程。消费者线程主要是将buffer中的数据取出来,当buffer为空的时候阻塞自己,并通知生产者线程,当ArretDemande被置位,且已经消费完产品则解锁,并通知生产者线程。需要注意的是需要通信的生产者和消费者这两个线程通过condition variable来实现通信,必须操作同一个mutex,这里是lockbuffer,并且每次Notify都会打开当前锁。

    程序中对interlock进行的操作是原子的,interlock.fet_add(N),效果是将interlock加N,然后返回interlock在加N之前的值,atomic类型是通过一定的内存顺序规则来实现这个过程的。

    虽然conditon_variable 只能支持std::unique_lock<std::mutex>类型的互斥锁,但是在大部分情况下已经够用,而且使用std::unique_lock<std::mutex>会比较简单,因为std::unique_lock<std::mutex>在声明的时候就会初始化,在生命周期结束之后就会自动解锁,因此我们不用太花精力来考虑什么时候解锁。我们来看看下面这段程序:

    复制代码
    复制代码
    #include <condition_variable>
    #include <mutex>
    #include <thread>
    #include <iostream>
    #include <queue>
    #include <chrono>
     
    int main()
    {
        std::queue<int> produced_nums;
        std::mutex m;;
        std::condition_variable cond_var;
        bool done = false;
        bool notified = false;
     
        std::thread producer([&]() {
            for ( int i = 0; i < 5; ++i) {
                std::this_thread::sleep_for(std::chrono:: seconds(1));
                std:: unique_lock<std::mutex > lock(m);  //May lock mutex after construction, unlock before destruction.
                std::cout << "producing " << i << '
    ' ;
                produced_nums.push(i);
                notified = true;
           cond_var.notify_one(); } done = true; cond_var.notify_one(); }); //cond_var.notify_one(); std::thread consumer([&]() { while (!done) { std:: unique_lock<std::mutex > lock(m); while (!notified) { // loop to avoid spurious wakeups cond_var.wait(lock); } while (!produced_nums.empty()) { std::cout << "consuming " << produced_nums.front() << ' '; produced_nums.pop(); } notified = false; } }); producer.join(); consumer.join(); return 0; }
    复制代码
    复制代码

     运行结果:

    C:Windowssystem32cmd.exe /c producer_consumer.exe
    producing 0
    consuming 0
    producing 1
    consuming 1
    producing 2
    consuming 2
    producing 3
    consuming 3
    producing 4
    consuming 4
    Hit any key to close this window...

  • 相关阅读:
    poj 3068 Bridge Across Islands
    XidianOJ 1086 Flappy v8
    XidianOJ 1036 分配宝藏
    XidianOJ 1090 爬树的V8
    XidianOJ 1088 AK后的V8
    XidianOJ 1062 Black King Bar
    XidianOJ 1091 看Dota视频的V8
    XidianOJ 1098 突击数论前的xry111
    XidianOJ 1019 自然数的秘密
    XidianOJ 1109 Too Naive
  • 原文地址:https://www.cnblogs.com/lvdongjie/p/4487774.html
Copyright © 2011-2022 走看看