zoukankan      html  css  js  c++  java
  • Linux组件封装之五:生产者消费者问题

    生产者,消费者问题是有关互斥锁(MutexLock)、条件变量(Condition)、线程(Thread)的经典案例;

    描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品。

    生产着消费者问题的难点在于:

    为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区投放或者消费产品,这个buffer就是所谓的临界资源。
    生产者往缓冲区中投放产品时,如果缓冲区已满,那么该线程需要等待,即进入阻塞状态,一直到消费者取走产品为止。
    相应的,消费者欲取走产品,如果此时缓冲区为空,即没有产品,那么消费者则需要等待,一直到有生产者投放产品为止。

    第一个问题属于互斥问题,我们需要一把互斥锁实现互斥访问(MutexLock), 以确保实现缓冲区的安全访问。
    后两个问题则属于同步问题,两类线程相互协作,我们需要两个条件变量,一个用于通知消费者从缓冲区取走产品,另一个通知生产者往缓冲区投放产品。

    生产者的大概流程为:

    1、加锁;
    2、若缓冲区已满,则进入等待状态;否则执行 33、生产产品;
    4、解锁;
    5、通知消费者取走产品

    消费者的大概流程为:

    1、加锁;
    2、若缓冲区已空,则进入等待状态;否则执行 33、取走产品;
    4、解锁;
    5、通知生产者生产产品

    为此,我们设计出一个缓冲区类,把互斥锁和条件变量作为其成员变量;

     1 #ifndef BUFFER_H_
     2 #define BUFFER_H_
     3 
     4 #include "NonCopyable.h"
     5 #include "MutexLock.h"
     6 #include "Condition.h"
     7 #include <queue> 
     8 
     9 class Buffer:NonCopyable
    10 {
    11     public:
    12         Buffer(size_t size);//attention
    13 
    14         void push(int val);//投放产品
    15         int pop();//取走产品
    16 
    17         bool isEmpty()const;
    18         size_t size()const;
    19     private:
    20          mutable MutexLock mutex_;//注意声明次序,不能改变
    21         Condition full_;
    22         Condition empty_;
    23         
    24         size_t size_;//缓冲区大小
    25         std::queue<int> q_;
    26 };
    27 
    28 #endif

    这里注意, 我们把同步与互斥的操作都放入Buffer中,是得生产者和消费者线程不必考虑其中的细节,这符合软件设计的“高内聚,低耦合”原则;

    还有一点, mutex被声明为mutable类型,意味着mutex的状态在const函数中仍然可以被改变,是符合程序逻辑的,把mutex声明为mutable,是一种标准实现

    Buffer的具体实现代码如下:

     1 #include "Buffer.h"
     2 #include <iostream>
     3 Buffer::Buffer(size_t size)
     4     :size_(size),
     5     full_(mutex_), //用mutex初始化Condition的一个对象
     6     empty_(mutex_)//用mutex初始化Condition的另一个对象
     7 {} 
     8 
     9 void Buffer::push(int val)
    10 {
    11     { //attention 作用域问题
    12     MutexGuard lock(mutex_);
    13     while(q_.size()>= size_)
    14         empty_.wait();
    15     q_.push(val);
    16     }
    17     full_.signal();
    18 }   
    19 
    20 int Buffer::pop()//attention
    21 {
    22     int tmp= 0;
    23     {
    24         MutexGuard lock(mutex_);
    25         while(q_.empty())
    26             full_.wait();
    27         tmp = q_.front();
    28         q_.pop();
    29     }
    30     empty_.signal();
    31     return tmp;
    32 }
    33 
    34 
    35 bool Buffer::isEmpty()const
    36 {
    37 //after    
    38     MutexGuard lock(mutex_);//作用域仅限于花括号内
    39     return q_.empty();
    40 }
    41 
    42 size_t Buffer::size()const
    43 {
    44     MutexGuard lock(mutex_);
    45     return q_.size();
    46 }

    注意:
    1、条件变量的等待必须使用While, 这是一种最佳实践,原因可见Condition的封装 Linux组件封装之二:Condition

    2、可以先notify,也可以先解锁,不过推荐先解锁,原因是如果线程A先notify,唤醒一个线程B,但是A还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销

    这里还有一个问题,就是我们在main函数中,必须一个一个的声明生产者,消费者,一个一个的去start、join,那么为了防止这种麻烦,我们可以怎么做呢?

    我们可以将缓冲区与多个生产者、消费者封装成一个 车间类。代码如下:

     1 #ifndef WORKSHOP_H_
     2 #define WORKSHOP_H_
     3 
     4 #include "NonCopyable.h"
     5 #include "Buffer.h"
     6 #include <vector>
     7 
     8 class ProducerThread;
     9 class ConsumerThread;
    10 class Buffer;
    11 class WorkShop:NonCopyable
    12 {
    13     public:
    14         WorkShop(size_t bufferSize,
    15                  size_t producerSize,
    16                  size_t consumerSize);
    17 
    18         ~WorkShop();
    19         void startWorking();
    20   
    21     private:
    22         size_t bufferSize_;
    23         Buffer buffer_;
    24 
    25         size_t producerSize_;
    26         size_t consumerSize_;
    27         std::vector<ProducerThread*> producers_;
    28         std::vector<ConsumerThread*> consumers_;
    29 };
    30 
    31 #endif

    实现如下(注意之处放在cpp中);

     1 #include "WorkShop.h"
     2 #include "ProducerThread.h"
     3 #include "ConsumerThread.h"
     4 
     5 //version 1
     6 WorkShop::WorkShop(size_t buffersize,
     7                    size_t producerSize,
     8                    size_t consumerSize)
     9     :bufferSize_(buffersize),
    10      buffer_(bufferSize_),
    11      producerSize_(producerSize),
    12      consumerSize_(consumerSize),
    13      producers_(producerSize_, new ProducerThread(buffer_)),  
    14      consumers_(consumerSize_, new ConsumerThread(buffer_)) 
    15 {
    16 
    17 }
    18 
    19 //version 2
    20 /*
    21 WorkShop::WorkShop(size_t buffersize,
    22                    size_t producerSize,
    23                    size_t consumerSize)
    24     :bufferSize_(buffersize),
    25      buffer_(bufferSize_),
    26      producerSize_(producerSize),
    27      consumerSize_(consumerSize),
    28      producers_(producerSize_,NULL),//vector 的初始化   
    29      consumers_(consumerSize_,NULL) 
    30 {
    31     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
    32         it != producers_.end();
    33         ++it)
    34     {
    35         *it = new ProducerThread(buffer_);
    36     }
    37 
    38     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
    39         it != consumers_.end();
    40         ++it)
    41     {
    42         *it = new ConsumerThread(buffer_);
    43     }
    44 }
    45 */
    46 
    47 WorkShop::~WorkShop()
    48 {
    49     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
    50         it != producers_.end();
    51         ++it)
    52     {
    53          delete *it ;
    54     }
    55 
    56     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
    57         it != consumers_.end();
    58         ++it)
    59     {
    60         delete *it ;
    61     }
    62 }
    63 
    64 void WorkShop::startWorking()
    65 {
    66     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
    67         it != producers_.end();
    68         ++it)
    69     {
    70         //注意,此循环不能同时调用start,join->发生阻塞(只能产生一个 ProducerThread)
    71         (*it)->start() ;
    72     }
    73     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
    74         it != consumers_.end();
    75         ++it)
    76     {
    77         (*it)->start() ;
    78     }
    79 
    80     for(std::vector<ProducerThread*>::iterator it = producers_.begin();
    81         it != producers_.end();
    82         ++it)
    83     {
    84          (*it)->join() ;
    85     }
    86     for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();
    87         it != consumers_.end();
    88         ++it)
    89     {
    90         (*it)->join() ;
    91     }
    92 }

    这样我们就可以同时指定 buffer的大小,生产者的数目,消费者的数目

  • 相关阅读:
    Knockout应用开发指南 第十章:更多信息(完结篇)
    Knockout应用开发指南 第九章:高级应用举例
    汤姆大叔博客索引
    HTML5学习笔记简明版(1):HTML5介绍与语法
    HTML5学习笔记简明版 目录索引
    大叔手记(1):使用Visual Studio的查找与替换替代默认的系统搜索
    微软ASP.NET站点部署指南(9):部署数据库更新
    微软ASP.NET站点部署指南(11):部署SQL Server数据库更新
    彻底搞定C指针
    linux svn 使用
  • 原文地址:https://www.cnblogs.com/xfxu/p/4010361.html
Copyright © 2011-2022 走看看