zoukankan      html  css  js  c++  java
  • 【转】生产者与消费者

    下面是一个生产者消费者问题,来介绍condition_variable的用法。当线程间的共享数据发生变化的时候,可以通过condition_variable来通知其他的线程。消费者wait 直到生产者通知其状态发生改变,Condition_variable是使用方法如下:

    ·当持有锁之后,线程调用wait

    ·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中

    ·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue)

     线程被唤醒之后继续持有锁运行。

    Condition variable有两种类型:condition_variable 和 condition_variable_any,前一种效率更高,但是使用不够灵活,只支持std::unique_lock<std::mutex>类型的互斥锁;后一种比较灵活,支持所有类型的锁,但是效率稍微低一些。

    有一点需要注意的是使用condition variable进行通信的线程,condition variable 需要使用相同的互斥信号量(mutex)。

    下面来看例子:(当按下回车键之后停止)

    #include <thread>
    
    #include <iostream>
    
    #include <mutex>
    
    #include <queue>
    
    #include <condition_variable>
    
    #include <atomic>
    
    using namespace std;
    
    int main()
    {
    
        mutex lockBuffer; //申明互斥信号量
    
        volatile bool ArretDemande = false; //使生产、消费过程的结束
    
        queue<long> buffer;       
    
        condition_variable_any cndNotifierConsommateurs;//condition variable
    
        condition_variable_any cndNotifierProducteur;   
     
        thread ThreadProducteur([&]()//生产者线程
        {
           
            std::atomic<long> interlock;//对interlock的操作将是原子的
    
            interlock=1;   
    
            while(true)
            {               
    
                    std::this_thread::sleep_for (chrono::milliseconds (15));               
    
                    long element=interlock.fetch_add (1);//【1】
    
                    lockBuffer.lock ();
    
                    while(buffer.size()==10 && ArretDemande ==false)
                    {
                       
                        cndNotifierProducteur.wait (lockBuffer);//【2】
    
                    }
    
                    if (ArretDemande==true)
    
                    {                   
    
                        lockBuffer.unlock ();
    
                        cndNotifierConsommateurs.notify_one ();//【3】
    
                        break;
    
                    }
    
                    buffer.push(element);
    
                    cout << "Production unlement :" << element << " size :" << buffer.size() << endl;
    
                    lockBuffer.unlock ();
    
                    cndNotifierConsommateurs.notify_one ();
    
            }
    
        } );
    
        thread ThreadConsommateur([&]()
        {
          
            while(true)
                {
                   
                    lockBuffer.lock ();
    
                    while(buffer.empty () && ArretDemande==false)
    
                    {                   
    
                        cndNotifierConsommateurs.wait(lockBuffer);
    
                    }
    
                    if (ArretDemande==true && buffer.empty ())
    
                    {
    
                        lockBuffer.unlock();
    
                        cndNotifierProducteur.notify_one ();
    
                        break;
    
                    }
    
                    long element=buffer.front();
    
                    buffer.pop ();
    
                    cout << "Consommation element :" << element << " size :" << buffer.size() << endl;
    
                    lockBuffer.unlock ();
    
                    cndNotifierProducteur.notify_one ();
    
                }           
    
        } );
    
        std::cout << "Pour arreter pressez [ENTREZ]" << std::endl;
    
        getchar();
    
        std::cout << "Arret demande" << endl
        ArretDemande=true;
    
        ThreadProducteur.join();
        ThreadConsommateur.join();
    
        cout<<"Main Thread"<<endl;
    
        return 0;
    
    }

    运行结果:

    对程序进行一下说明,程序中有三个线程,主线程、生产者线程、消费者线程,三个线程之间乱序执行,通过一些全局变量来控制他们的执行顺序。主线程的作用是控制生产消费过程是否结束,当程序运行之后,主线程通过getchar()接收一个输入,接收到输入后会将ArretDemande设置为true,另外两个线程会终止。生产者线程将生产出来的数据放在一个queue类型的buffer中,并解锁,通知消费之线程,buffer中最多“能”存10个数据,如果buffer中已经有10个数据还没有被取走,则会通知消费者线程“消费”,如果ArretDmande被置位,则打开锁,并通知消费之线程。消费者线程主要是将buffer中的数据取出来,当buffer为空的时候阻塞自己,并通知生产者线程,当ArretDemande被置位,且已经消费完产品则解锁,并通知生产者线程。需要注意的是需要通信的生产者和消费者这两个线程通过condition variable来实现通信,必须操作同一个mutex,这里是lockbuffer,并且每次Notify都会打开当前锁。

    程序中对interlock进行的操作是原子的,interlock.fet_add(N),效果是将interlock加N,然后返回interlock在加N之前的值,atomic类型是通过一定的内存顺序规则来实现这个过程的。

    虽然conditon_variable 只能支持std::unique_lock<std::mutex>类型的互斥锁,但是在大部分情况下已经够用,而且使用std::unique_lock<std::mutex>会比较简单,因为std::unique_lock<std::mutex>在声明的时候就会初始化,在生命周期结束之后就会自动解锁,因此我们不用太花精力来考虑什么时候解锁。我们来看看下面这段程序:

    #include <condition_variable>
    #include <mutex>
    #include <thread>
    #include <iostream>
    #include <queue>
    #include <chrono>
     
    int main()
    {
        std::queue<int> produced_nums;
        std::mutex m;;
        std::condition_variable cond_var;
        bool done = false;
        bool notified = false;
     
        std::thread producer([&]() {
            for ( int i = 0; i < 5; ++i) {
                std::this_thread::sleep_for(std::chrono:: seconds(1));
                std:: unique_lock<std::mutex > lock(m);  //May lock mutex after construction, unlock before destruction.
                std::cout << "producing " << i << '
    ' ;
                produced_nums.push(i);
                notified = true;
            cond_var.notify_one();
            }  
     
            done = true;
            cond_var.notify_one();
        });
        //cond_var.notify_one();
        std::thread consumer([&]() {
            while (!done) {
                std:: unique_lock<std::mutex > lock(m);
                while (!notified) {  // loop to avoid spurious wakeups
                    cond_var.wait(lock);
                }  
                while (!produced_nums.empty()) {
                    std::cout << "consuming " << produced_nums.front() << '
    ';
                    produced_nums.pop();
                }  
                notified = false;
            }  
        });
     
        producer.join();
        consumer.join();
    
            return 0;
    }

    运行结果:

    C:Windowssystem32cmd.exe /c producer_consumer.exe
    producing 0
    consuming 0
    producing 1
    consuming 1
    producing 2
    consuming 2
    producing 3
    consuming 3
    producing 4
    consuming 4
    Hit any key to close this window...

  • 相关阅读:
    php 上传大文件主要涉及配置upload_max_filesize和post_max_size两个选项
    Linux 文件系统IO性能优化【转】
    MOOC Linux内核之旅小结【转】
    python实战===教你用微信每天给女朋友说晚安【转】
    wxpy: 用 Python 玩微信【转】
    AMBA总线协议AHB、APB、AXI对比分析【转】
    高手进阶,终极内存技术指南——完整/进阶版 II (转)【转】
    ARMCC和GCC编译ARM代码的软浮点和硬浮点问题 【转】
    程序员必知之浮点数运算原理详解【转】
    Hash算法【转】
  • 原文地址:https://www.cnblogs.com/wengzilin/p/3680020.html
Copyright © 2011-2022 走看看