zoukankan      html  css  js  c++  java
  • 多生产者单消费者模型

    #include <unistd.h>
    
    #include <cstdlib>
    #include <condition_variable>
    #include <iostream>
    #include <mutex>
    #include <thread>
    
    static const int kItemRepositorySize  = 4; // Item buffer size.
    static const int kItemsToProduce  = 10;   // How many items we plan to produce.
    
    struct ItemRepository {
        int item_buffer[kItemRepositorySize];
        size_t read_position;
        size_t write_position;
        size_t item_counter;
        std::mutex mtx;
        std::mutex item_counter_mtx;
        std::condition_variable repo_not_full;
        std::condition_variable repo_not_empty;
    } gItemRepository;
    
    typedef struct ItemRepository ItemRepository;
    
    
    void ProduceItem(ItemRepository *ir, int item)
    {
        std::unique_lock<std::mutex> lock(ir->mtx);
        while(((ir->write_position + 1) % kItemRepositorySize)
            == ir->read_position) { // item buffer is full, just wait here.
            std::cout << "Producer is waiting for an empty slot...\n";
            (ir->repo_not_full).wait(lock);
        }
    
        (ir->item_buffer)[ir->write_position] = item;
        (ir->write_position)++;
    
        if (ir->write_position == kItemRepositorySize)
            ir->write_position = 0;
    
        (ir->repo_not_empty).notify_all();
        lock.unlock();
    }
    
    int ConsumeItem(ItemRepository *ir)
    {
        int data;
        std::unique_lock<std::mutex> lock(ir->mtx);
        // item buffer is empty, just wait here.
        while(ir->write_position == ir->read_position) {
            std::cout << "Consumer is waiting for items...\n";
            (ir->repo_not_empty).wait(lock);
        }
    
        data = (ir->item_buffer)[ir->read_position];
        (ir->read_position)++;
    
        if (ir->read_position >= kItemRepositorySize)
            ir->read_position = 0;
    
        (ir->repo_not_full).notify_all();
        lock.unlock();
    
        return data;
    }
    
    void ProducerTask()
    {
        bool ready_to_exit = false;
        while(1) {
            sleep(1);
            std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);
            if (gItemRepository.item_counter < kItemsToProduce) {
                ++(gItemRepository.item_counter);
                ProduceItem(&gItemRepository, gItemRepository.item_counter);
                std::cout << "Producer thread " << std::this_thread::get_id()
                    << " is producing the " << gItemRepository.item_counter
                    << "^th item" << std::endl;
            } else ready_to_exit = true;
            lock.unlock();
            if (ready_to_exit == true) break;
        }
        std::cout << "Producer thread " << std::this_thread::get_id()
                    << " is exiting..." << std::endl;
    }
    
    void ConsumerTask()
    {
        static int item_consumed = 0;
        while(1) {
            sleep(1);
            ++item_consumed;
            if (item_consumed <= kItemsToProduce) {
                int item = ConsumeItem(&gItemRepository);
                std::cout << "Consumer thread " << std::this_thread::get_id()
                    << " is consuming the " << item << "^th item" << std::endl;
            } else break;
        }
        std::cout << "Consumer thread " << std::this_thread::get_id()
                    << " is exiting..." << std::endl;
    }
    
    void InitItemRepository(ItemRepository *ir)
    {
        ir->write_position = 0;
        ir->read_position = 0;
        ir->item_counter = 0;
    }
    
    int main()
    {
        InitItemRepository(&gItemRepository);
        std::thread producer1(ProducerTask);
        std::thread producer2(ProducerTask);
        std::thread producer3(ProducerTask);
        std::thread producer4(ProducerTask);
        std::thread consumer(ConsumerTask);
    
        producer1.join();
        producer2.join();
        producer3.join();
        producer4.join();
        consumer.join();
    }
  • 相关阅读:
    list和set的区别
    tcp与udp协议的区别
    c3p0的数据库连接池
    mysql的连接
    HAVING 的使用 及笛卡尔积
    break-跳出内循环
    求素数(范围自改)
    1-100累加
    1-100累乘
    类实例:飞机大战
  • 原文地址:https://www.cnblogs.com/jobs1/p/10769113.html
Copyright © 2011-2022 走看看