zoukankan      html  css  js  c++  java
  • C++11 实现生产者消费者模式

    代码都类似,看懂一个,基本都能理解了。

    共有代码:

    #include <cstdlib>
    #include <condition_variable>
    #include <iostream>
    #include <mutex>
    #include <thread>

    static const int kItemRepositorySize = 10; // Item buffer size.
    static const int kItemsToProduce = 1000;   // How many items we plan to produce.
    std::mutex mutex;//多线程标准输出同步锁

    单生产者单消费者模式:

      1 struct ItemRepository
      2     {
      3         int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
      4         size_t read_position; // 消费者读取产品位置.
      5         size_t write_position; // 生产者写入产品位置.
      6         std::mutex mtx; // 互斥量,保护产品缓冲区
      7         std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
      8         std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
      9     } gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.
     10 
     11     typedef struct ItemRepository ItemRepository;
     12 
     13     void ProduceItem(ItemRepository * ir, int item)
     14     {
     15         std::unique_lock<std::mutex> lock(ir->mtx);
     16         while (((ir->write_position + 1) % kItemRepositorySize)
     17             == ir->read_position)
     18         { // item buffer is full, just wait here.
     19             {
     20                 std::lock_guard<std::mutex> lock(mutex);
     21                 std::cout << "缓冲区满,等待缓冲区不满
    ";
     22             }
     23             (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
     24         }
     25 
     26         (ir->item_buffer)[ir->write_position] = item; // 写入产品.
     27         (ir->write_position)++; // 写入位置后移.
     28 
     29         if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
     30             ir->write_position = 0;
     31 
     32         (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
     33         lock.unlock(); // 解锁.
     34     }
     35 
     36     int ConsumeItem(ItemRepository *ir)
     37     {
     38         int data;
     39         std::unique_lock<std::mutex> lock(ir->mtx);
     40         // item buffer is empty, just wait here.
     41         while (ir->write_position == ir->read_position)
     42         {
     43             {
     44                 std::lock_guard<std::mutex> lock(mutex);
     45                 std::cout << "缓冲区空,等待生产者生成产品
    ";
     46             }
     47             (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
     48         }
     49 
     50         data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
     51         (ir->read_position)++; // 读取位置后移
     52 
     53         if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
     54             ir->read_position = 0;
     55 
     56         (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
     57         lock.unlock(); // 解锁.
     58 
     59         return data; // 返回产品.
     60     }
     61 
     62 
     63     void ProducerTask() // 生产者任务
     64     {
     65         for (int i = 1; i <= kItemsToProduce; ++i)
     66         {
     67             // sleep(1);
     68             ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
     69             {
     70                 std::lock_guard<std::mutex> lock(mutex);
     71                 std::cout << "生产第 " << i << "个产品" << std::endl;
     72             }
     73         }
     74     }
     75 
     76     void ConsumerTask() // 消费者任务
     77     {
     78         static int cnt = 0;
     79         while (1)
     80         {
     81             std::this_thread::sleep_for(std::chrono::seconds(1));
     82             int item = ConsumeItem(&gItemRepository); // 消费一个产品.
     83             {
     84                 std::lock_guard<std::mutex> lock(mutex);
     85                 std::cout << "消费第" << item << "个产品" << std::endl;
     86             }
     87             if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
     88         }
     89     }
     90 
     91     void InitItemRepository(ItemRepository *ir)
     92     {
     93         ir->write_position = 0; // 初始化产品写入位置.
     94         ir->read_position = 0; // 初始化产品读取位置.
     95     }
     96 
     97     void test()
     98     {
     99         InitItemRepository(&gItemRepository);
    100         std::thread producer(ProducerTask); // 创建生产者线程.
    101         std::thread consumer(ConsumerTask); // 创建消费之线程.
    102 
    103         producer.join();
    104         consumer.join();
    105     }
    View Code

    单生产者多消费者模式:

      1 struct ItemRepository
      2     {
      3         int item_buffer[kItemRepositorySize];
      4         size_t read_position;
      5         size_t write_position;
      6         size_t item_counter;
      7         std::mutex mtx;
      8         std::mutex item_counter_mtx;
      9         std::condition_variable repo_not_full;
     10         std::condition_variable repo_not_empty;
     11     } gItemRepository;
     12 
     13     typedef struct ItemRepository ItemRepository;
     14 
     15     void ProduceItem(ItemRepository *ir, int item)
     16     {
     17         std::unique_lock<std::mutex> lock(ir->mtx);
     18         while (((ir->write_position + 1) % kItemRepositorySize)
     19             == ir->read_position)
     20         {
     21             // item buffer is full, just wait here.
     22             {
     23                 std::lock_guard<std::mutex> lock(mutex);
     24                 std::cout << "缓冲区满,等待缓冲区不满
    ";
     25             }
     26             (ir->repo_not_full).wait(lock);
     27         }
     28 
     29         (ir->item_buffer)[ir->write_position] = item;
     30         (ir->write_position)++;
     31 
     32         if (ir->write_position == kItemRepositorySize)
     33             ir->write_position = 0;
     34 
     35         (ir->repo_not_empty).notify_all();
     36         lock.unlock();
     37     }
     38 
     39     int ConsumeItem(ItemRepository *ir)
     40     {
     41         int data;
     42         std::unique_lock<std::mutex> lock(ir->mtx);
     43         // item buffer is empty, just wait here.
     44         while (ir->write_position == ir->read_position)
     45         {
     46             {
     47                 std::lock_guard<std::mutex> lock(mutex);
     48                 std::cout << "缓冲区空,等待生产者生产产品
    ";
     49             }
     50             (ir->repo_not_empty).wait(lock);
     51         }
     52 
     53         data = (ir->item_buffer)[ir->read_position];
     54         (ir->read_position)++;
     55 
     56         if (ir->read_position >= kItemRepositorySize)
     57             ir->read_position = 0;
     58 
     59         (ir->repo_not_full).notify_all();
     60         lock.unlock();
     61 
     62         return data;
     63     }
     64 
     65 
     66     void ProducerTask()
     67     {
     68         for (int i = 1; i <= kItemsToProduce; ++i)
     69         {
     70             std::this_thread::sleep_for(std::chrono::milliseconds(6));
     71             ProduceItem(&gItemRepository, i);
     72             {
     73                 std::lock_guard<std::mutex> lock(mutex);
     74                 std::cout << "生产线程" << std::this_thread::get_id()
     75                     << "生产第" << i << "个产品" << std::endl;
     76             }
     77         }
     78         {
     79             std::lock_guard<std::mutex> lock(mutex);
     80             std::cout << "生产线程" << std::this_thread::get_id()
     81                 << "退出" << std::endl;
     82         }
     83     }
     84 
     85     void ConsumerTask()
     86     {
     87         bool ready_to_exit = false;
     88         while (1)
     89         {
     90         //    std::this_thread::sleep_for(std::chrono::milliseconds(6));
     91             std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
     92             if (gItemRepository.item_counter < kItemsToProduce)
     93             {
     94                 int item = ConsumeItem(&gItemRepository);
     95                 ++(gItemRepository.item_counter);
     96                 {
     97                     std::lock_guard<std::mutex> lock(mutex);
     98                     std::cout << "消费线程" << std::this_thread::get_id()
     99                         << "正在消费第" << item << "个产品" << std::endl;
    100                 }
    101             }
    102             else
    103             {
    104                 ready_to_exit = true;
    105             }
    106             lock.unlock();
    107 
    108             if (ready_to_exit == true)
    109             {
    110                 break;
    111             }
    112         }
    113         {
    114             std::lock_guard<std::mutex> lock(mutex);
    115             std::cout << "消费线程" << std::this_thread::get_id()
    116                 << "退出" << std::endl;
    117         }
    118     }
    119 
    120     void InitItemRepository(ItemRepository *ir)
    121     {
    122         ir->write_position = 0;
    123         ir->read_position = 0;
    124         ir->item_counter = 0;
    125     }
    126 
    127     void test()
    128     {
    129         InitItemRepository(&gItemRepository);
    130         std::thread producer(ProducerTask);
    131         std::thread consumer1(ConsumerTask);
    132         std::thread consumer2(ConsumerTask);
    133         std::thread consumer3(ConsumerTask);
    134         std::thread consumer4(ConsumerTask);
    135 
    136         producer.join();
    137         consumer1.join();
    138         consumer2.join();
    139         consumer3.join();
    140         consumer4.join();
    141     }
    View Code

    多消费者单生产者模式:

      1 struct ItemRepository
      2     {
      3         int item_buffer[kItemRepositorySize];
      4         size_t read_position;
      5         size_t write_position;
      6         size_t item_counter;
      7         std::mutex mtx;
      8         std::mutex item_counter_mtx;
      9         std::condition_variable repo_not_full;
     10         std::condition_variable repo_not_empty;
     11     } gItemRepository;
     12 
     13     typedef struct ItemRepository ItemRepository;
     14 
     15     void ProduceItem(ItemRepository *ir, int item)
     16     {
     17         std::unique_lock<std::mutex> lock(ir->mtx);
     18         while (((ir->write_position + 1) % kItemRepositorySize)
     19             == ir->read_position)
     20         { 
     21             {
     22                 std::lock_guard<std::mutex> lock(mutex);
     23                 std::cout << "缓冲区满,等待缓冲区不满
    ";
     24             }
     25             (ir->repo_not_full).wait(lock);
     26         }
     27 
     28         (ir->item_buffer)[ir->write_position] = item;
     29         (ir->write_position)++;
     30 
     31         if (ir->write_position == kItemRepositorySize)
     32             ir->write_position = 0;
     33 
     34         (ir->repo_not_empty).notify_all();
     35         lock.unlock();
     36     }
     37 
     38     int ConsumeItem(ItemRepository *ir)
     39     {
     40         int data;
     41         std::unique_lock<std::mutex> lock(ir->mtx);
     42         // item buffer is empty, just wait here.
     43         while (ir->write_position == ir->read_position)
     44         {
     45             {
     46                 std::lock_guard<std::mutex> lock(mutex);
     47                 std::cout << "消费者等待产品中。。。
    ";
     48             }
     49             (ir->repo_not_empty).wait(lock);
     50         }
     51 
     52         data = (ir->item_buffer)[ir->read_position];
     53         (ir->read_position)++;
     54 
     55         if (ir->read_position >= kItemRepositorySize)
     56             ir->read_position = 0;
     57 
     58         (ir->repo_not_full).notify_all();
     59         lock.unlock();
     60 
     61         return data;
     62     }
     63 
     64     void ProducerTask()
     65     {
     66         bool ready_to_exit = false;
     67         while (1)
     68         {
     69             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     70             std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
     71             if (gItemRepository.item_counter < kItemsToProduce) 
     72             {
     73                 ++(gItemRepository.item_counter);
     74                 ProduceItem(&gItemRepository, gItemRepository.item_counter);
     75                 {
     76                     std::lock_guard<std::mutex> lock(mutex);
     77                     std::cout << "消费者" << std::this_thread::get_id()
     78                         << "正在生产第" << gItemRepository.item_counter
     79                         << "个产品" << std::endl;
     80                 }
     81             }
     82             else
     83             {
     84                 ready_to_exit = true;
     85             }
     86             lock.unlock();
     87             if (ready_to_exit == true) break;
     88         }
     89         {
     90             std::lock_guard<std::mutex> lock(mutex);
     91             std::cout << "消费者" << std::this_thread::get_id()
     92                 << "退出" << std::endl;
     93         }
     94     }
     95 
     96     void ConsumerTask()
     97     {
     98         static int item_consumed = 0;
     99         while (1) 
    100         {
    101             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    102             ++item_consumed;
    103             if (item_consumed <= kItemsToProduce)
    104             {
    105                 int item = ConsumeItem(&gItemRepository);
    106                 {
    107                     std::lock_guard<std::mutex> lock(mutex);
    108                     std::cout << "消费者" << std::this_thread::get_id()
    109                         << "正在消费第" << item << "个产品" << std::endl;
    110                 }
    111             }
    112             else break;
    113         }
    114         {
    115             std::lock_guard<std::mutex> lock(mutex);
    116             std::cout << "消费者" << std::this_thread::get_id()
    117                 << "退出" << std::endl;
    118         }
    119     }
    120 
    121     void InitItemRepository(ItemRepository *ir)
    122     {
    123         ir->write_position = 0;
    124         ir->read_position = 0;
    125         ir->item_counter = 0;
    126     }
    127 
    128     void test()
    129     {
    130         InitItemRepository(&gItemRepository);
    131         std::thread producer1(ProducerTask);
    132         std::thread producer2(ProducerTask);
    133         std::thread producer3(ProducerTask);
    134         std::thread producer4(ProducerTask);
    135         std::thread consumer(ConsumerTask);
    136 
    137         producer1.join();
    138         producer2.join();
    139         producer3.join();
    140         producer4.join();
    141         consumer.join();
    142     }
    View Code

    多消费者多生产者模式:

      1 struct ItemRepository 
      2     {
      3         int item_buffer[kItemRepositorySize];
      4         size_t read_position;
      5         size_t write_position;
      6         size_t produced_item_counter;
      7         size_t consumed_item_counter;
      8         std::mutex mtx;
      9         std::mutex produced_item_counter_mtx;
     10         std::mutex consumed_item_counter_mtx;
     11         std::condition_variable repo_not_full;
     12         std::condition_variable repo_not_empty;
     13     } gItemRepository;
     14 
     15     typedef struct ItemRepository ItemRepository;
     16 
     17     void ProduceItem(ItemRepository *ir, int item)
     18     {
     19         std::unique_lock<std::mutex> lock(ir->mtx);
     20         while (((ir->write_position + 1) % kItemRepositorySize)
     21             == ir->read_position)
     22         { 
     23             // item buffer is full, just wait here.
     24             {
     25                 std::lock_guard<std::mutex> lock(mutex);
     26                 std::cout << "缓冲区满,生产者等待中
    ";
     27             }
     28             (ir->repo_not_full).wait(lock);
     29         }
     30 
     31         (ir->item_buffer)[ir->write_position] = item;
     32         (ir->write_position)++;
     33 
     34         if (ir->write_position == kItemRepositorySize)
     35             ir->write_position = 0;
     36 
     37         (ir->repo_not_empty).notify_all();
     38         lock.unlock();
     39     }
     40 
     41     int ConsumeItem(ItemRepository *ir)
     42     {
     43         int data;
     44         std::unique_lock<std::mutex> lock(ir->mtx);
     45         // item buffer is empty, just wait here.
     46         while (ir->write_position == ir->read_position) 
     47         {
     48             {
     49                 std::lock_guard<std::mutex> lock(mutex);
     50                 std::cout << "缓冲区空,消费者等待中
    ";
     51             }
     52             (ir->repo_not_empty).wait(lock);
     53         }
     54 
     55         data = (ir->item_buffer)[ir->read_position];
     56         (ir->read_position)++;
     57 
     58         if (ir->read_position >= kItemRepositorySize)
     59             ir->read_position = 0;
     60 
     61         (ir->repo_not_full).notify_all();
     62         lock.unlock();
     63 
     64         return data;
     65     }
     66 
     67     void ProducerTask()
     68     {
     69         bool ready_to_exit = false;
     70         while (1) 
     71         {
     72             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     73             std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);
     74             if (gItemRepository.produced_item_counter < kItemsToProduce) 
     75             {
     76                 ++(gItemRepository.produced_item_counter);
     77                 ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);
     78                 {
     79                     std::lock_guard<std::mutex> lock(mutex); 
     80                     std::cout << "生产者" << std::this_thread::get_id()
     81                         << "正在生产第" << gItemRepository.produced_item_counter
     82                         << "个产品" << std::endl;
     83                 }
     84             }
     85             else
     86             {
     87                 ready_to_exit = true;
     88             }
     89 
     90             lock.unlock();
     91             if (ready_to_exit == true)
     92             {
     93                 break;
     94             }
     95         }
     96         {
     97             std::lock_guard<std::mutex> lock(mutex);
     98             std::cout << "生产者" << std::this_thread::get_id()
     99                 << "退出" << std::endl;
    100         }
    101     }
    102 
    103     void ConsumerTask()
    104     {
    105         bool ready_to_exit = false;
    106         while (1) 
    107         {
    108             std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    109             std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);
    110             if (gItemRepository.consumed_item_counter < kItemsToProduce) 
    111             {
    112                 int item = ConsumeItem(&gItemRepository);
    113                 ++(gItemRepository.consumed_item_counter);
    114                 {
    115                     std::lock_guard<std::mutex> lock(mutex);
    116                     std::cout << "消费者" << std::this_thread::get_id()
    117                         << "正在消费第" << item << "个产品" << std::endl;
    118                 }
    119             }
    120             else
    121             {
    122                 ready_to_exit = true;
    123             }
    124             lock.unlock();
    125             if (ready_to_exit == true)
    126             {
    127                 break;
    128             }
    129         }
    130         {
    131             std::lock_guard<std::mutex> lock(mutex);
    132             std::cout << "消费者" << std::this_thread::get_id()
    133                 << "退出" << std::endl;
    134         }
    135     }
    136 
    137     void InitItemRepository(ItemRepository *ir)
    138     {
    139         ir->write_position = 0;
    140         ir->read_position = 0;
    141         ir->produced_item_counter = 0;
    142         ir->consumed_item_counter = 0;
    143     }
    144 
    145     void test()
    146     {
    147         InitItemRepository(&gItemRepository);
    148         std::thread producer1(ProducerTask);
    149         std::thread producer2(ProducerTask);
    150         std::thread producer3(ProducerTask);
    151         std::thread producer4(ProducerTask);
    152 
    153         std::thread consumer1(ConsumerTask);
    154         std::thread consumer2(ConsumerTask);
    155         std::thread consumer3(ConsumerTask);
    156         std::thread consumer4(ConsumerTask);
    157 
    158         producer1.join();
    159         producer2.join();
    160         producer3.join();
    161         producer4.join();
    162 
    163         consumer1.join();
    164         consumer2.join();
    165         consumer3.join();
    166         consumer4.join();
    167     }
    View Code

    注:

    1、当缓存容量为n时,其实只能存放n-1个产品,主要原因是,当缓存满和空时,用取余无法区分

    2、当单单模式变成多多模式时,只是针对单变多的某一方多添加一个读写锁

    3、向标准缓冲区输出字符串时,由于是多线程的,所以需要使用读写锁来同步

    完整实例下载:http://files.cnblogs.com/files/swarmbees/Producer_Consumer.zip

    如果您觉得文章不错,不妨给个打赏,写作不易,感谢各位的支持。您的支持是我最大的动力,谢谢!!! 

     

      


    很重要--转载声明

    1. 本站文章无特别说明,皆为原创,版权所有,转载时请用链接的方式,给出原文出处。同时写上原作者:朝十晚八 or Twowords
    2. 如要转载,请原文转载,如在转载时修改本文,请事先告知,谢绝在转载时通过修改本文达到有利于转载者的目的。 

  • 相关阅读:
    操作系统上机实验
    选择排序
    插入排序(c++)
    嵌入式原理实验代码集合
    iOS应用程序生命周期(前后台切换,应用的各种状态)详解
    ios Base64编解码工具类及使用
    iOS:横向使用iPhone默认的翻页效果
    ios学习笔记之内存管理
    ios NavBar+TarBar技巧
    IOS设备滑动事件
  • 原文地址:https://www.cnblogs.com/swarmbees/p/5889297.html
Copyright © 2011-2022 走看看