zoukankan      html  css  js  c++  java
  • C++ 如何用百行代码实现线程安全的并发队列 | concurrent queue or blocking queue implemented in cpp

    本文首发于个人博客https://kezunlin.me/post/cabccf5c/,欢迎阅读最新内容!

    concurrent queue or blocking queue implemented in cpp

    Guide

    introduction

    Where produce-consumer pattern is present it is often the case that one is faster that the other:

    • a parsing producer reads records faster than a processing consumer;
    • a disk reading producer is faster than network sending consumer.

    Producer and consumer often communicate by queues: the producer will put items on a queue while the consumer will pop items off a queue. What happens when the queue becomes full, or empty?

    One approach of the producer is to try to put an item on a queue and if it’s full yield the thread and repeat. Similarly the consumer can try to pop an item off a queue and if it’s empty, ditto. This approach of try-fail-yield can unnecessarily burn CPU cycles in tight loops that constantly try to put or pop items off a queue.

    Another approach is to temporarily grow the queue, but that doesn’t scale well. When do we stop growing? And once we stop we have to fall back onto the try-fail-yield method.

    What if we could implement a blocking queue:

    • a queue who’s put operation blocks when the queue if full, and unblocks only when another thread pops an item off the queue
    • Similarly a queue who’s pop operation blocks when the queue is empty, and unblocks only when another thread puts an item on the queue.

    Quote from here

    An example of using such a queue would look like this (notice a fast producer and slow consumer in the code below):

    blocking queue v1

    //std
    #include <queue>
    
    //boost
    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    #include <boost/asio.hpp>
    
    namespace my { 
    namespace algorithm {
    
    template<typename Data>
    class SHARED_EXPORT blocking_queue
    {
    private:
    	std::queue<Data> the_queue;
    	mutable boost::mutex the_mutex;
    	boost::condition_variable the_condition_variable;
    
    public:
    	void push(Data const& data)
    	{
    		boost::mutex::scoped_lock lock(the_mutex);
    		the_queue.push(data);
    		lock.unlock();
    		the_condition_variable.notify_one();
    	}
    
    	bool empty() const
    	{
    		boost::mutex::scoped_lock lock(the_mutex);
    		return the_queue.empty();
    	}
    
    	size_t size() const
    	{
    		boost::mutex::scoped_lock lock(the_mutex);
    		return the_queue.size();
    	}
    
    	bool try_pop(Data& popped_value)
    	{
    		boost::mutex::scoped_lock lock(the_mutex);
    		if (the_queue.empty())
    		{
    			return false;
    		}
    
    		popped_value = the_queue.front();
    		the_queue.pop();
    		return true;
    	}
    
    	void wait_and_pop(Data& popped_value)
    	{
    		boost::mutex::scoped_lock lock(the_mutex);
    		while (the_queue.empty())
    		{
    			the_condition_variable.wait(lock);
    		}
    
    		popped_value = the_queue.front();
    		the_queue.pop();
    	}
    
    	void signal_exit()
    	{
    		Data data;
    		push(data);
    	}
    
    };
    
    }
    }// end namespace
    

    blocking queue v2

    
    #pragma once
    #include <iostream>
    #include <assert.h>	
    
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    
    #define MAX_CAPACITY 20
    
    namespace my {
    namespace algorithm {
    
    template<typename T>
    class SHARED_EXPORT BlockingQueue
    {
    public:
    	BlockingQueue() 
    	:mtx(), full_(), empty_(), capacity_(MAX_CAPACITY) { }
    
    
    	void Push(const T& data){
    		std::unique_lock<std::mutex> lock(mtx);
    		while(queue_.size() == capacity_){
    			full_.wait(lock );
    		}
    
    		assert(queue_.size() < capacity_);
    		queue_.push(data);
    		empty_.notify_all(); 
    	}
    
    	T Pop(){
    		std::unique_lock<std::mutex> lock(mtx);
    		while(queue_.empty()){
    			empty_.wait(lock );
    		}
    
    		assert(!queue_.empty());
    		T front(queue_.front());
    		queue_.pop();
    		full_.notify_all();
    		return front;
    	}
    
    	T Front(){
    		std::unique_lock<std::mutex> lock(mtx);
    		while(queue_.empty()){
    			empty_.wait(lock );
    		}
    
    		assert(!queue_.empty());
    		T front(queue_.front());
    		return front;
    	}
    
    	T Back(){
    		std::unique_lock<std::mutex> lock(mtx);
    		while(queue_.empty()){
    			empty_.wait(lock );
    		}
    
    		assert(!queue_.empty());
    		T back(queue_.back());
    		return back;
    	}
    
    	size_t Size(){
    		std::lock_guard<std::mutex> lock(mtx);
    		return queue_.size();
    	}
    
    	bool Empty(){
    		std::unique_lock<std::mutex> lock(mtx);
    		return queue_.empty();
    	}
    
    	void SetCapacity(const size_t capacity){
    		capacity_ = (capacity > 0 ? capacity : MAX_CAPACITY);
    	}
    
    private:
    	//DISABLE_COPY_AND_ASSIGN(BlockingQueue);
    	BlockingQueue(const BlockingQueue& rhs);
    	BlockingQueue& operator= (const BlockingQueue& rhs);
    
    private:
    	mutable std::mutex mtx;
    	std::condition_variable full_;
    	std::condition_variable empty_;
    	std::queue<T> queue_;
    	size_t capacity_; 
    };
    
    
    }
    }// end namespace
    

    Reference

    History

    • 20191012: created.

    Copyright

  • 相关阅读:
    VB6SP6极度精简兼容绿色版
    Upnp资料整理
    RevMan简单入门指南
    小程序 --flex
    IV
    2017-10-27错误日志
    170616_2
    170616
    2017-06-07
    111111112222
  • 原文地址:https://www.cnblogs.com/kezunlin/p/12052428.html
Copyright © 2011-2022 走看看