zoukankan      html  css  js  c++  java
  • 单生产者多消费者模型

    #include <unistd.h>
    
    #include <cstdlib>
    #include <condition_variable>
    #include <iostream>
    #include <mutex>
    #include <thread>
    
    static const int kItemRepositorySize  = 4; // Item buffer size.
    static const int kItemsToProduce  = 10;   // How many items we plan to produce.
    
    struct ItemRepository {
        int item_buffer[kItemRepositorySize];
        size_t read_position;
        size_t write_position;
        size_t item_counter;
        std::mutex mtx;
        std::mutex item_counter_mtx;
        std::condition_variable repo_not_full;
        std::condition_variable repo_not_empty;
    } gItemRepository;
    
    typedef struct ItemRepository ItemRepository;
    
    
    void ProduceItem(ItemRepository *ir, int item)
    {
        std::unique_lock<std::mutex> lock(ir->mtx);
        while(((ir->write_position + 1) % kItemRepositorySize)
            == ir->read_position) { // item buffer is full, just wait here.
            std::cout << "Producer is waiting for an empty slot...\n";
            (ir->repo_not_full).wait(lock);
        }
    
        (ir->item_buffer)[ir->write_position] = item;
        (ir->write_position)++;
    
        if (ir->write_position == kItemRepositorySize)
            ir->write_position = 0;
    
        (ir->repo_not_empty).notify_all();
        lock.unlock();
    }
    
    int ConsumeItem(ItemRepository *ir)
    {
        int data;
        std::unique_lock<std::mutex> lock(ir->mtx);
        // item buffer is empty, just wait here.
        while(ir->write_position == ir->read_position) {
            std::cout << "Consumer is waiting for items...\n";
            (ir->repo_not_empty).wait(lock);
        }
    
        data = (ir->item_buffer)[ir->read_position];
        (ir->read_position)++;
    
        if (ir->read_position >= kItemRepositorySize)
            ir->read_position = 0;
    
        (ir->repo_not_full).notify_all();
        lock.unlock();
    
        return data;
    }
    
    
    void ProducerTask()
    {
        for (int i = 1; i <= kItemsToProduce; ++i) {
            // sleep(1);
            std::cout << "Producer thread " << std::this_thread::get_id()
                << " producing the " << i << "^th item..." << std::endl;
            ProduceItem(&gItemRepository, i);
        }
        std::cout << "Producer thread " << std::this_thread::get_id()
                    << " is exiting..." << std::endl;
    }
    
    void ConsumerTask()
    {
        bool ready_to_exit = false;
        while(1) {
            sleep(1);
            std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
            if (gItemRepository.item_counter < kItemsToProduce) {
                int item = ConsumeItem(&gItemRepository);
                ++(gItemRepository.item_counter);
                std::cout << "Consumer thread " << std::this_thread::get_id()
                    << " is consuming the " << item << "^th item" << std::endl;
            } else ready_to_exit = true;
            lock.unlock();
            if (ready_to_exit == true) break;
        }
        std::cout << "Consumer thread " << std::this_thread::get_id()
                    << " is exiting..." << std::endl;
    }
    
    void InitItemRepository(ItemRepository *ir)
    {
        ir->write_position = 0;
        ir->read_position = 0;
        ir->item_counter = 0;
    }
    
    int main()
    {
        InitItemRepository(&gItemRepository);
        std::thread producer(ProducerTask);
        std::thread consumer1(ConsumerTask);
        std::thread consumer2(ConsumerTask);
        std::thread consumer3(ConsumerTask);
        std::thread consumer4(ConsumerTask);
    
        producer.join();
        consumer1.join();
        consumer2.join();
        consumer3.join();
        consumer4.join();
    }
  • 相关阅读:
    hive中如何查询除了其中某个字段剩余所有字段
    查找出不同环境下同一张表的数据差异
    pycharm中导入包失败的解决办法
    hive如何获取当前时间
    python-匿名函数
    Tensorflow报错:OMP: Error #15: Initializing libiomp5.dylib, but found libiomp5.dylib already initialized.
    Tensorflow中Tensor对象的常用方法(持续更新)
    Numpy中的广播机制,数组的广播机制(Broadcasting)
    重装conda
    matplotlib作图一例
  • 原文地址:https://www.cnblogs.com/jobs1/p/10769128.html
Copyright © 2011-2022 走看看