简单生产者消费者模型
#include <iostream> #include <queue> #include <thread> #include <mutex> #include<atomic> #include <windows.h> #include <condition_variable> using namespace std; atomic<int> x = 0; mutex mtx,mx; // 条件变量是一种同步机制,要和mutex以及lock一起使用 condition_variable buffer_not_full, buffer_not_empty; //队列缓存区 queue<int> buffer; int maxSize = 20; void consumer(int id) { while (1) { //间隔1s生产一个 Sleep(1); //加锁 unique_lock<mutex> lck(mtx); //队列为空的时候挂起消费者线程 while (buffer.size() == 0) { { lock_guard<mutex> lock(mx); cout << "buffer is empty" << endl; } buffer_not_empty.wait(lck); } cout << "consumer " << id << ": " << buffer.front() << endl; buffer.pop(); cout << "size=" << buffer.size() << ' '; buffer_not_full.notify_all(); lck.unlock(); } } void producer(int id) { while (1) { //控制生产速度 Sleep(5); unique_lock<mutex> lck(mtx); //队列满,停止生产 while (buffer.size() == maxSize) { { lock_guard<mutex> lock(mx); cout << "buffer is full" << endl; } buffer_not_full.wait(lck); } // this_thread::get_id() 返回一个线程 标识 cout << "producer " << id << ": " << x << endl; buffer.push(x++); cout << "size=" << buffer.size() << ' '; // notify(wake up) consumer when buffer.size() != 0 is true buffer_not_empty.notify_all(); lck.unlock(); } } int main() { //声明 生产者和消费者个2个线程 thread p1[2], p2[2]; //创建线程 for (int i = 0; i < 2; ++i) { p1[i] = thread(consumer, i + 1); p2[i] = thread(producer, i + 1); //thread:第一个参数是task任务,第二个参数是task函数的参数 } //启动线程 for (int i = 0; i < 2; ++i) { p2[i].join(); p1[i].join(); } system("pause"); return 0; }
使用环形缓存队列的生产者-消费者模型
#include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread> static const int kItemRepositorySize = 10; // Item buffer size. static const int kItemsToProduce = 1000; // How many items we plan to produce. std::mutex mutex; //单生产者--单消费者 namespace SS { struct ItemRepository { // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列. int item_buffer[kItemRepositorySize]; // 消费者读取产品位置. size_t read_position; // 生产者写入产品位置. size_t write_position; // 互斥量,保护产品缓冲区(同一时间只能被生产者或消费者使用) std::mutex mtx; // 条件变量, 指示产品缓冲区不为满. std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为空. std::condition_variable repo_not_empty; } gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量. typedef struct ItemRepository ItemRepository; //生产者 void ProduceItem(ItemRepository * ir, int item) { std::unique_lock<std::mutex> lock(ir->mtx); //判断缓存区是否已满 while (((ir->write_position + 1) % kItemRepositorySize) == ir->read_position) { //缓存区满的时候,生产者需要等待缓存区不为满的时候才能继续生成,挂起等待 { std::lock_guard<std::mutex> lock(mutex); std::cout << "缓冲区满,等待缓冲区不满 "; } (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生. } (ir->item_buffer)[ir->write_position] = item; // 写入产品. (ir->write_position)++; // 写入位置后移. // 写入位置若是在队列最后则重新设置为初始位置. if (ir->write_position == kItemRepositorySize) ir->write_position = 0; (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空. } //消费者 int ConsumeItem(ItemRepository *ir) { int data; std::unique_lock<std::mutex> lock(ir->mtx); // item buffer is empty, just wait here. while (ir->write_position == ir->read_position) { { std::lock_guard<std::mutex> lock(mutex); std::cout << "缓冲区空,等待生产者生成产品 "; } (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生. } data = (ir->item_buffer)[ir->read_position]; // 读取某一产品 (ir->read_position)++; // 读取位置后移 if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位. ir->read_position = 0; (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满. return data; // 返回产品. } // 生产者任务 void ProducerTask() { for (int i = 1; i <= kItemsToProduce; ++i) { // sleep(1); ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品. { std::lock_guard<std::mutex> lock(mutex); std::cout << "生产第 " << i << "个产品" << std::endl; } } } // 消费者任务 void ConsumerTask() { static int cnt = 0; while (1) { std::this_thread::sleep_for(std::chrono::seconds(1)); int item = ConsumeItem(&gItemRepository); // 消费一个产品. { std::lock_guard<std::mutex> lock(mutex); std::cout << "消费第" << item << "个产品" << std::endl; } // 如果产品消费个数为 kItemsToProduce, 则退出. if (++cnt == kItemsToProduce) break; } } //初始化开始条件 void InitItemRepository(ItemRepository *ir) { ir->write_position = 0; // 初始化产品写入位置. ir->read_position = 0; // 初始化产品读取位置. } void test() { InitItemRepository(&gItemRepository); //新建多个生产者或消费者线程实现多产/多消 std::thread producer(ProducerTask); // 创建生产者线程. std::thread consumer(ConsumerTask); // 创建消费之线程. producer.join(); consumer.join(); //互斥变量的使用,实现生产者和消费者的循环使用 } } //单生产者-多消费者 int main() { SS::test(); return 0; }