zoukankan      html  css  js  c++  java
  • C++多线程队列实现

    C++多线程队列实现

    介绍

    在项目中,进行多线程队列实现是一个比较麻烦的事, 找到了一个实现比较好的多线程队列实现, 自己做了一点修改更加适应自己的项目, 记录下来, 有需要的自己进行修改使用.

    代码写的并不是很好, 封装起来的实现也是并不是很好用, 个人水平的一个记录, 希望理解

    多线程队列实现

    1. 初始化一定长度的空间存储数据
    2. 每次压入或者弹出操作的时候需要获取锁, 保证同时只有一个操作可以被执行,
    3. 压入或者弹出数据的时候, 如果队列已经满了或者空的, 另外一个线程可能需要等待, 或者返回 false 具体看程序注释,考虑自己情况进行程序修改
    4. 最终清理对象数据, 清空队列,退出线程
    具体实现代码
    #ifndef CQUEUE_H__
    #define CQUEUE_H__
    #pragma once
    #include <atomic>
    #include <condition_variable>
    #include <functional>
    #include <memory>
    #include <mutex>
    #include <queue>
    
    /**
     * @class   Queue CQueue.h CodeincCQueue.h
     *
     * @brief   线程安全队列实现
     * *        因为有std::mutex和std::condition_variable类成员,所以此类不支持复制构造函数也不支持赋值操作符(=)
     *
     * @author  IRIS_Chen
     * @date    2019/10/10
     *
     * @tparam  T   Generic type parameter.
     */
    template <class T>
    
    /**
     * @class   CQueue CQueue.h CodeincCQueue.h
     *
     * @brief   Queue of cs.
     *
     * @author  IRIS_Chen
     * @date    2019/10/17
     */
    
    class CQueue
    {
        protected:
        // Data
        std::queue<T> _queue;   ///< 存储数据的真实队列, 不是线程安全的
        private:
        typename std::queue<T>::size_type _size_max;    ///< 队列的最大长度
        // Thread gubbins
        std::mutex _mutex;  ///<  线程操作 锁
        std::condition_variable _fullQue;   ///< 队列满了的信号 
        std::condition_variable _empty; ///< 队列为空的信号
    
        // Exit
        // 原子操作
        std::atomic_bool _quit; ///< { false };     // 退出信号
        std::atomic_bool _finished; ///< { false }; // 完成信号 // 表示不再继续输入数据
    
        public:
    
        /**
         * @fn  CQueue::CQueue(const size_t size_max)
         *
         * @brief   初始化队列长度,并将退出标志和 满信号标志置空
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   size_max    队列的最长尺寸
         */
    
        CQueue(const size_t size_max) :_size_max(size_max) {
            _quit = ATOMIC_VAR_INIT(false);
            _finished = ATOMIC_VAR_INIT(false);
        }
    
        /**
         * @fn  CQueue::CQueue(CONST CQueue&) = delete;
         *
         * @brief   不允许拷贝构造函数
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   parameter1  The first parameter
         */
    
        CQueue(CONST CQueue&) = delete; ///< 不允许拷贝构造函数
        /**
         * @fn  CQueue::~CQueue()
         *
         * @brief   Finalizes an instance of the CQueue class  销毁队列, 退出线程 清除数据 // 存在问题
         *
         * @author  IRIS_Chen
         * @date    2019/11/8
         */
    
        ~CQueue()
        {
            Quit();
            while (_queue.size())
                ;
        }
        /**
         * @fn  bool CQueue::Push(T& data)
         *
         * @brief   队列中加入新的 对象  根据情况决定 满信号之后 新数据丢弃或者等待
         *
         * @author  IRIS_Chen
         * @date    2019/10/10
         *
         * @param [in,out]  data    The data to Push.
         *
         * @return  True if it succeeds, false if it fails.
         */
    
        bool Push(T& data)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            while (!_quit && !_finished)
            {
                if (_queue.size() < _size_max)
                {
                    _queue.push(std::move(data));
                    //_queue.Push(data);
                    _empty.notify_all();
                    return true;
                }
                else
                {
                    // wait的时候自动释放锁,如果wait到了会获取锁
                    // _fullQue.wait(lock);
                    return false;   ///< 如果满了 这里不进行等待 避免出现问题
                }
            }
    
            return false;
        }
    
        /**
         * @fn  bool CQueue::Pop(T &data)
         *
         * @brief   返回队列最前面的元素 并且弹出 // 如果空 如果finish 则直接返回fasle 否则 等待队列加入元素
         *
         * @author  IRIS_Chen
         * @date    2019/10/14
         *
         * @param [in,out]  data    The data to Pop.
         *
         * @return  True if it succeeds, false if it fails.
         */
    
        bool Pop(T &data)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            while (!_quit)
            {
                if (!_queue.empty())                // 队列非空
                {
                    //data = std::move(_queue.front());
                    data = _queue.front();
                    _queue.pop();
    
                    _fullQue.notify_all();       // 通知所有 由于满队无法加入的线程
                    return true;
                }
                else if (_queue.empty() && _finished)   // 队列为空 且不再加入
                {
                    return false;
                }
                else
                {
                    // _empty.wait(lock);          // 等待队列加入元素
                    return false;   ///< 不等待元素加入数据
                }
            }
            return false;
        }
    
        /**
         * @fn  std::shared_ptr<T> CQueue::Pop(void)
         *
         * @brief   弹出一个元素 直接返回  出错无法报错
         *
         * @author  IRIS_Chen
         * @date    2019/10/14
         *
         * @return  The previous top-of-stack object.
         */
    
        std::shared_ptr<T> Pop(void)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            std::shared_ptr<T> res = nullptr;
            while (!_quit)
            {
                if (!_queue.empty())                // 队列非空
                {
                    //data = std::move(_queue.front());
                    res = std::make_shared<T>(_queue.front());
                    _queue.pop();
    
                    _fullQue.notify_all();       // 通知所有 由于满队无法加入的线程
    
                    return res;
                }
                else if (_queue.empty() && _finished)   // 队列为空 且不再加入
                {
                    return res;     // 无数据进入 智能返回一个空指针 (可能出错)
                }
                else
                {
                    _empty.wait(lock);          // 等待队列加入元素
                }
            }
            return false;
        }
    
        /**
         * @fn  void CQueue::Finished()
         *
         * @brief   The queue has Finished accepting input 标识队列完成输入 不再继续输入
         *
         * @author  IRIS_Chen
         * @date    2019/10/14
         */
    
        void Finished()
        {
            _finished = true;
            _empty.notify_all();
        }
    
        /**
         * @fn  void CQueue::Quit()
         *
         * @brief   Quits this CQueue  退出队列, 无法再加入压入或者弹出数据
         *
         * @author  IRIS_Chen
         * @date    2019/10/14
         */
    
        void Quit()
        {
            _quit = true;
            _empty.notify_all();
            _fullQue.notify_all();
        }
    
        /**
         * @fn  int CQueue::Length()
         *
         * @brief   Gets the Length  返回队列目前长度
         *
         * @author  IRIS_Chen
         * @date    2019/10/14
         *
         * @return  An int.
         */
    
        int Length()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return static_cast<int>(_queue.size());
        }
    
        /**
         * @fn  int CQueue::Size()
         *
         * @brief   Gets the Size 返回当前队列长度
         *
         * @author  IRIS_Chen
         * @date    2019/10/14
         *
         * @return  An int.
         */
        int Size()
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return static_cast<int>(_queue.size());
        }
    
        /**
         * @fn  bool CQueue::empty(void)
         *
         * @brief   判断是否为空
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  True if it succeeds, false if it fails
         */
    
        bool Empty(void)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            return (0 == _queue.size());
        }
    
        /**
         * @fn  bool CQueue::Clear(void)
         *
         * @brief   清空队列
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  True if it succeeds, false if it fails
         */
    
        bool Clear(void)
        {
            std::unique_lock<std::mutex> lock(_mutex);
            while (!_queue.empty ())
            {
                Pop();  // 依次弹出数据
            }
            return true;
        }
    };
    #endif

    更多

    有另外的多线程实现, 来自网上找到的, 可能找不到参考链接了, 贴出来 供参考

    线程安全队列 1

    具体实现代码
    /**
     * @class   ThreadSafeQueue CQueue.h CodeincCQueue.h
     *
     * @brief   线程安全队列实现
     *
     * @author  IRIS_Chen
     * @date    2019/10/10
     *
     * @tparam  T   Generic type parameter.
     */
    template<typename T>
    
    /**
     * @class   ThreadSafeQueue CQueue.h CodeincCQueue.h
     *
     * @brief   Queue of thread safes.
     *
     * @author  IRIS_Chen
     * @date    2019/10/17
     */
    
    class ThreadSafeQueue
    {
        private:
    
        /**
         * @property    mutable std::mutex mut
         *
         * @brief   Gets the mut
         *
         * @return  The mut
         */
    
        mutable std::mutex mut;
        std::queue<T> data_queue;   ///< Queue of data
        std::condition_variable data_cond;  ///< The data condition
        public:
    
        /**
         * @fn  ThreadSafeQueue::ThreadSafeQueue()
         *
         * @brief   Initializes a new instance of the ThreadSafeQueue class
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         */
    
        ThreadSafeQueue() {}
    
        /**
         * @fn  ThreadSafeQueue::ThreadSafeQueue(ThreadSafeQueue const& other)
         *
         * @brief   拷贝构造函数
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   other   The other
         */
    
        ThreadSafeQueue(ThreadSafeQueue const& other)
        {
            std::lock_guard<std::mutex> lk(other.mut);
            data_queue = other.data_queue;
        }
    
        /**
         * @fn  void ThreadSafeQueue::Push(T& new_value)
         *
         * @brief   Pushes an object onto this stack
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param [in,out]  new_value   The new value to Push
         */
    
        void push(T& new_value)//入队操作
        {
            std::lock_guard<std::mutex> lk(mut);
            data_queue.push(new_value);
            data_cond.notify_one();
        }
    
        /**
         * @fn  void ThreadSafeQueue::wait_and_pop(T& value)
         *
         * @brief   Wait and Pop
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param [in,out]  value   The value
         */
    
        void wait_and_pop(T& value)//直到有元素可以删除为止
        {
            std::unique_lock<std::mutex> lk(mut);
            data_cond.wait(lk, [this]
            {
                return !data_queue.empty();
            });
            value = data_queue.front();
            data_queue.pop();
        }
    
        /**
         * @fn  std::shared_ptr<T> ThreadSafeQueue::wait_and_pop()
         *
         * @brief   Wait and pop
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  A std::shared_ptr&lt;T&gt;
         */
    
        std::shared_ptr<T> wait_and_pop()
        {
            std::unique_lock<std::mutex> lk(mut);
            data_cond.wait(lk, [this]
            {
                return !data_queue.empty();
            });
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }
    
        /**
         * @fn  bool ThreadSafeQueue::try_pop(T& value)
         *
         * @brief   Attempts to pop
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param [in,out]  value   The value
         *
         * @return  True if it succeeds, false if it fails
         */
    
        bool try_pop(T& value)//不管有没有队首元素直接返回
        {
            std::lock_guard<std::mutex> lk(mut);
            if (data_queue.empty())
                return false;
            value = data_queue.front();
            data_queue.pop();
            return true;
        }
    
        /**
         * @fn  std::shared_ptr<T> ThreadSafeQueue::try_pop()
         *
         * @brief   Try pop
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  A std::shared_ptr&lt;T&gt;
         */
    
        std::shared_ptr<T> try_pop()
        {
            std::lock_guard<std::mutex> lk(mut);
            if (data_queue.empty())
                return std::shared_ptr<T>();
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }
    
        /**
         * @fn  bool ThreadSafeQueue::empty() const
         *
         * @brief   Empties this object
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  True if it succeeds, false if it fails
         */
    
        bool empty() const
        {
            std::lock_guard<std::mutex> lk(mut);
            return data_queue.empty();
        }
    };
    

    线程安全队列 2

    具体实现代码
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include <initializer_list>
    /*
    * 线程安全队列
    * T为队列元素类型
    * 因为有std::mutex和std::condition_variable类成员,所以此类不支持复制构造函数也不支持赋值操作符(=)
    * */
    template<typename T>
    
    /**
     * @class   threadsafe_queue CQueue.h CodeincCQueue.h
     *
     * @brief   Queue of threadsafes.
     *
     * @author  IRIS_Chen
     * @date    2019/10/17
     */
    
    class threadsafe_queue
    {
        private:
    
        /**
         * @property    mutable std::mutex mut
         *
         * @brief   data_queue访问信号量
         *
         * @return  The mut
         */
    
        mutable std::mutex mut;
    
        /**
         * @property    mutable std::condition_variable data_cond
         *
         * @brief   Gets the data condition
         *
         * @return  The data condition
         */
    
        mutable std::condition_variable data_cond;
        using queue_type = std::queue<T>;   ///< Type of the queue
        queue_type data_queue;  ///< Queue of data
        public:
        using value_type = typename queue_type::value_type; ///< Type of the value
        using container_type = typename queue_type::container_type; ///< Type of the container
    
        /**
         * @fn  threadsafe_queue::threadsafe_queue() = default;
         *
         * @brief   Initializes a new instance of the threadsafe_queue class
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         */
    
        threadsafe_queue() = default;
    
        /**
         * @fn  threadsafe_queue::threadsafe_queue(const threadsafe_queue&) = delete;
         *
         * @brief   Initializes a new instance of the threadsafe_queue class
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   parameter1  The first parameter
         */
    
        threadsafe_queue(const threadsafe_queue&) = delete;
    
        /**
         * @fn  threadsafe_queue& threadsafe_queue::operator=(const threadsafe_queue&) = delete;
         *
         * @brief   Assignment operator
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   parameter1  The first parameter
         *
         * @return  A shallow copy of this object
         */
    
        threadsafe_queue& operator=(const threadsafe_queue&) = delete;
        /*
        * 使用迭代器为参数的构造函数,适用所有容器对象
        * */
        template<typename _InputIterator>
    
        /**
         * @fn  threadsafe_queue::threadsafe_queue(_InputIterator first, _InputIterator last)
         *
         * @brief   Initializes a new instance of the threadsafe_queue class
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   first   The first
         * @param   last    The last
         */
    
        threadsafe_queue(_InputIterator first, _InputIterator last) {
            for (auto itor = first; itor != last; ++itor)
            {
                data_queue.push(*itor);
            }
        }
    
        /**
         * @fn  explicit threadsafe_queue::threadsafe_queue(const container_type &c)
         *
         * @brief   Initializes a new instance of the threadsafe_queue class
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   c   A container_type to process
         */
    
        explicit threadsafe_queue(const container_type &c) :data_queue(c) {}
        /*
        * 使用初始化列表为参数的构造函数
        * */
    
        /**
         * @fn  threadsafe_queue::threadsafe_queue(std::initializer_list<value_type> list)
         *
         * @brief   Initializes a new instance of the threadsafe_queue class
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   list    The list
         */
    
        threadsafe_queue(std::initializer_list<value_type> list) :threadsafe_queue(list.begin(), list.end()) {
        }
        /*
        * 将元素加入队列
        * */
    
        /**
         * @fn  void threadsafe_queue::push(const value_type &new_value)
         *
         * @brief   Pushes an object onto this stack
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param   new_value   The new value to push
         */
    
        void push(const value_type &new_value) {
            std::lock_guard<std::mutex>lk(mut);
            data_queue.push(std::move(new_value));
            data_cond.notify_one();
        }
        /*
        * 从队列中弹出一个元素,如果队列为空就阻塞
        * */
    
        /**
         * @fn  value_type threadsafe_queue::wait_and_pop()
         *
         * @brief   Wait and pop
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  A value_type
         */
    
        value_type wait_and_pop() {
            std::unique_lock<std::mutex>lk(mut);
            data_cond.wait(lk, [this]
            {
                return !this->data_queue.empty();
            });
            auto value = std::move(data_queue.front());
            data_queue.pop();
            return value;
        }
        /*
        * 从队列中弹出一个元素,如果队列为空返回false
        * */
    
        /**
         * @fn  bool threadsafe_queue::try_pop(value_type& value)
         *
         * @brief   Attempts to pop
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @param [in,out]  value   The value
         *
         * @return  True if it succeeds, false if it fails
         */
    
        bool try_pop(value_type& value) {
            std::lock_guard<std::mutex>lk(mut);
            if (data_queue.empty())
                return false;
            value = std::move(data_queue.front());
            data_queue.pop();
            return true;
        }
        /*
        * 返回队列是否为空
        * */
    
        /**
         * @fn  auto threadsafe_queue::empty() const->decltype(data_queue.empty())
         *
         * @brief   Gets the empty
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  An auto
         */
    
        auto empty() const->decltype(data_queue.empty()) {
            std::lock_guard<std::mutex>lk(mut);
            return data_queue.empty();
        }
        /*
        * 返回队列中元素数个
        * */
    
        /**
         * @fn  auto threadsafe_queue::Size() const->decltype(data_queue.Size())
         *
         * @brief   Gets the Size
         *
         * @author  IRIS_Chen
         * @date    2019/10/17
         *
         * @return  An auto
         */
    
        auto size() const->decltype(data_queue.size()) {
            std::lock_guard<std::mutex>lk(mut);
            return data_queue.size();
        }
    }; /* threadsafe_queue */
    
    ## 参考链接
    1. C++11:基于std::queue和std::mutex构建一个线程安全的队列
    2. Thread-safe concurrent FIFO queue in C++
    3. Java多线程总结之线程安全队列Queue
    4. C++并发实战12:线程安全的queue
  • 相关阅读:
    windows7 端口查看以及杀死进程释放端口
    字符设备驱动模块与测试代码编写。
    c++项目范例
    较复杂makefile跟lds脚本程序的编写
    S5PV210时钟,看门狗定时器
    S5PV210中断处理
    arm 异常处理结构
    arm指令系统
    arm体系结构
    s5pv210 的启动
  • 原文地址:https://www.cnblogs.com/hugochen1024/p/12570632.html
Copyright © 2011-2022 走看看