zoukankan      html  css  js  c++  java
  • 【转】C++ 并发编程(五):生产者

    生产者 - 消费者(Producer-Consumer),也叫有限缓冲(Bounded-Buffer),是多线程同步的经典问题之一

    头文件

    #include <condition_variable>
    #include <iostream>
    #include <mutex>
    #include <thread>
    #include <vector>
    

    有限缓冲类

    class BoundedBuffer {
    public:
      BoundedBuffer(const BoundedBuffer& rhs) = delete;
      BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete;
    
      BoundedBuffer(std::size_t size)
          : begin_(0), end_(0), buffered_(0), circular_buffer_(size) {
      }
    
      void Produce(int n) {
        {
          std::unique_lock<std::mutex> lock(mutex_);
          // 等待缓冲不为满。
          not_full_cv_.wait(lock, [=] { return buffered_ < circular_buffer_.size(); });
    
          // 插入新的元素,更新下标。
          circular_buffer_[end_] = n;
          end_ = (end_ + 1) % circular_buffer_.size();
    
          ++buffered_;
        }  // 通知前,自动解锁。
    
        // 通知消费者。
        not_empty_cv_.notify_one();
      }
    
      int Consume() {
        std::unique_lock<std::mutex> lock(mutex_);
        // 等待缓冲不为空。
        not_empty_cv_.wait(lock, [=] { return buffered_ > 0; });
    
        // 移除一个元素。
        int n = circular_buffer_[begin_];
        begin_ = (begin_ + 1) % circular_buffer_.size();
    
        --buffered_;
    
        // 通知前,手动解锁。
        lock.unlock();
        // 通知生产者。
        not_full_cv_.notify_one();
        return n;
      }
    
    private:
      std::size_t begin_;
      std::size_t end_;
      std::size_t buffered_;
      std::vector<int> circular_buffer_;
      std::condition_variable not_full_cv_;
      std::condition_variable not_empty_cv_;
      std::mutex mutex_;
    };
    

    生产者与消费者线程共享的缓冲。g_io_mutex 是用来同步输出的。

    BoundedBuffer g_buffer(2);
    boost::mutex g_io_mutex;
    

    生产者

    生产 100000 个元素,每 10000 个打印一次。

    void Producer() {
      int n = 0;
      while (n < 100000) {
        g_buffer.Produce(n);
        if ((n % 10000) == 0) {
          std::unique_lock<std::mutex> lock(g_io_mutex);
          std::cout << "Produce: " << n << std::endl;
        }
        ++n;
      }
    
      g_buffer.Produce(-1);
    }
    

    消费者

    每消费到 10000 的倍数,打印一次。

    void Consumer() {
      std::thread::id thread_id = std::this_thread::get_id();
    
      int n = 0;
      do {
        n = g_buffer.Consume();
        if ((n % 10000) == 0) {
          std::unique_lock<std::mutex> lock(g_io_mutex);
          std::cout << "Consume: " << n << " (" << thread_id << ")" << std::endl;
        }
      } while (n != -1);  // -1 表示缓冲已达末尾。
    
      // 往缓冲里再放一个 -1,这样其他消费者才能结束。
      g_buffer.Produce(-1);
    }
    

    主线程

    int main() {
      std::vector<std::thread> threads;
    
      threads.push_back(std::thread(&Producer));
      threads.push_back(std::thread(&Consumer));
      threads.push_back(std::thread(&Consumer));
      threads.push_back(std::thread(&Consumer));
    
      for (auto& t : threads) {
        t.join();
      }
    
      return 0;
    }
    
  • 相关阅读:
    二叉排序树
    C# 大端与小端
    【转】C#socket通信
    【转】Github 搜索技巧,快速找到好资源
    web api 跨域请求,ajax跨域调用webapi
    【转】Linux简介及最常用命令
    【转】带你吃透RTMP
    09-vuex基本应用之计数demo
    08-配置vue路由的步骤
    02-原型与原型链
  • 原文地址:https://www.cnblogs.com/wsl-hitsz/p/14645376.html
Copyright © 2011-2022 走看看