zoukankan      html  css  js  c++  java
  • c++消息队列的实现

     
    #ifndef NET_FRAME_CONCURRENT_QUEUE_H  
    #define NET_FRAME_CONCURRENT_QUEUE_H  
    #include <queue>
    
    #include <mutex>  
    
    #include <condition_variable>  
    
    template<class Type>  
    
    /*消息队列实现*/  
    
    class ConcurrentQueue {  
    
            ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;  
    
            ConcurrentQueue(const ConcurrentQueue& other) = delete;  
    
    public:  
    
            ConcurrentQueue() : _queue(), _mutex(), _condition() { }  
    
    virtual ~ConcurrentQueue() { }  
    
    void Push(Type record) {  
    
                std::lock_guard <std::mutex> lock(_mutex);  
    
                _queue.push(record);  
    
                _condition.notify_one();  
    
            }  
    
    bool Pop(Type& record, bool isBlocked = true) {  
    
    if (isBlocked) {  
    
                    std::unique_lock <std::mutex> lock(_mutex);  
    
    while (_queue.empty()) {  
    
                        _condition.wait(lock);  
    
                    }  
    
                }  
    
    else // If user wants to retrieve data in non-blocking mode  
    
                {  
    
                    std::lock_guard <std::mutex> lock(_mutex);  
    
    if (_queue.empty()) {  
    
    return false;  
    
                    }  
    
                }  
    
                record = std::move(_queue.front());  
    
                _queue.pop();  
    
    return true;  
    
            }  
    
            int32_t Size() {  
    
                std::lock_guard <std::mutex> lock(_mutex);  
    
    return _queue.size();  
    
            }  
    
    bool Empty() {  
    
                std::lock_guard <std::mutex> lock(_mutex);  
    
    return _queue.empty();  
    
            }  
    
    private:  
    
            std::queue <Type> _queue;  
    
    mutable std::mutex _mutex;  
    
            std::condition_variable _condition;  
    
        };  
    
    #endif //NET_FRAME_CONCURRENT_QUEUE_H  

     


    (2)拥有消息队列的线程池的实现

    .h文件如下

    #ifndef NET_FRAME_THREAD_POOL_H  
    
    #define NET_FRAME_THREAD_POOL_H  
    
    #include "ConcurrentQueue.h"  
    
    #include <vector>  
    
    #include <queue>  
    
    #include <memory>  
    
    #include <thread>  
    
    #include <mutex>  
    
    #include <condition_variable>  
    
    #include <future>  
    
    #include <functional>  
    
    #include <stdexcept>  
    
    #define MIN_THREADS 10  
    
        template<class Type>  
    
        class ThreadPool {  
    
            ThreadPool& operator=(const ThreadPool&) = delete;  
    
            ThreadPool(const ThreadPool& other) = delete;  
    
        public:  
    
            ThreadPool(int32_t threads, std::function<void(Type& record)> handler);  
    
            virtual ~ThreadPool();  
    
            void Submit(Type record);  
    
        private:  
    
        private:  
    
            bool _shutdown;  
    
            int32_t _threads;  
    
            std::function<void(Type& record)> _handler;  
    
            std::vector <std::thread> _workers;  
    
            ConcurrentQueue <Type> _tasks;  
    
        };  
    
        template<class Type>  
    
        ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)  
    
                : _shutdown(false),  
    
                  _threads(threads),  
    
                  _handler(handler),  
    
                  _workers(),  
    
                  _tasks() {  
    
            if (_threads < MIN_THREADS)  
    
    _threads = MIN_THREADS;  
    
            for (int32_t i = 0; i < _threads; ++i)  
    
                _workers.emplace_back(  
    
                        [this] {  
    
                            while (!_shutdown) {  
    
                                Type record;  
    
                                _tasks.Pop(record, true);  
    
                                _handler(record);  
    
                            }  
    
                        }  
    
                );  
    
        }  
    
        template<class Type>  
    
        ThreadPool<Type>::~ThreadPool() {  
    
            for (std::thread &worker: _workers)  
    
                worker.join();  
    
        }  
    
        template<class Type>  
    
        void ThreadPool<Type>::Submit(Type record) {  
    
            _tasks.Push(record);  
    
        }  
    
    #endif //NET_FRAME_THREAD_POOL_H  
  • 相关阅读:
    从B树、B+树、B*树谈到R 树
    平衡二叉树、B树、B+树、B*树
    数据库事务和四种隔离级别
    python 安装surprise库解决 c++tools错误问题
    python的sorted函数
    爬虫出现gbk错误
    Windows下Python安装numpy+mkl,Scipy和statsmodels
    Flask--框架及路由
    flask常见面试题
    RE正则表达式
  • 原文地址:https://www.cnblogs.com/liaocheng/p/9114205.html
Copyright © 2011-2022 走看看