zoukankan      html  css  js  c++  java
  • c++11 实现半同步半异步线程池

    c++11 实现半同步半异步线程池


    感受:
    随着深入学习,现代c++给我带来越来越多的惊喜…
    c++真的变强大了。

    半同步半异步线程池:
    其实很好理解,分为三层
    同步层:通过IO复用或者其他多线程多进程等不断的将待处理事件添加到队列中,这个过程是同步进行的。
    队列层:所有待处理事件都会放到这里。上一层事件放到这里,下一层从这里获取事件
    异步层:事先创建好线程,让线程不断的去处理队列层的任务,上层不关心这些,它只负责把任务放到队列里,所以对上层来说这里是异步的。

    补充下思路:
    主要是后两层

    队列层:c++11 通过std::function可以将函数封装为对象,那么我们一个函数也就是一个任务,通过vector或list等容器来存储这些”任务”来供后面存取。因为会出现竞争资源的问题,所以我们要加锁,并且通过条件变量的条件来唤醒其他阻塞在锁上的线程,当然你想避免线程阻塞浪费资源可以用带时间的锁std::time_mutex。

    异步层:c++11 将线程也封装为了对象,那么我们创建一个容器保存线程对象,让他们去队列层取任务并执行,执行完并不结束该线程而是归还给容器(线程池)。

    看张图:

    如果你不熟悉c++11的内容 
    以下文章仅供参考 
    c++11 多线程

    c++11 智能指针

    c++11 对象移动

    c++11 lambda,bind,function

    代码

    同步队列:

    #include <list>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    #include <iostream>
    
    template<typename T>
    class SynQueue
    {
        public:
            SynQueue(int maxsize):
                m_maxSize(maxsize), m_needStop(false) { }
    
            //添加事件,左值拷贝和右值移动
            void Put(const T&x)
            {
                //调用private内部接口Add
                Add(x);
            }
            void Put(T &&x)
            {
                Add(x);
            }
    
            //从队列中取事件,取所有事件
            void Take(std::list<T> &list)
            {
                //有wait方法必须用unique_lock
                //unique_lock有定时等待等功能,lock_guard就仅仅是RAII手法的互斥锁
                //但unique_lock的性能稍低于lock_guard
                std::unique_lock<std::mutex> locker(m_mutex);
                //满足条件则唤醒,不满足阻塞
                m_notEmpty.wait(locker, [this]
                        { return m_needStop || NotEmpty(); });
                if(m_needStop)
                    return;
                list = std::move(m_queue);
                //唤醒其他阻塞在互斥锁的线程
                m_notFull.notify_one();
            }
    
            //取一个事件
            void Take(T &t)
            {
                std::unique_lock<std::mutex> locker(m_mutex);
                m_notEmpty.wait(locker, [this]
                        { return m_needStop || NotEmpty(); });
                if(m_needStop)
                    return;
                t = m_queue.front();
                m_queue.pop_front();
                m_notFull.notify_one();
                t();
            }
    
            //停止所有线程在同步队列中的读取
            void Stop()
            {
                {
                    std::lock_guard<std::mutex> locker(m_mutex);
                    m_needStop = true;
                }
                m_notFull.notify_all();
                m_notEmpty.notify_all();
            }
    
            //队列为空
            bool Empty()
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                return m_queue.empty();
            }
    
            //队列为满
            bool Full()
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                return m_queue.size() == m_maxSize;
            } 
    
            //队列大小
            size_t Size()
            {
                std::lock_guard<std::mutex> locker(m_mutex);
                return m_queue.size();
            }
    
    
        private: 
            //往队列里添加事件,事件是范型的,c++11我们可以把函数通过std::function封装为对象。
            template<typename F>
            void Add(F &&x)
            {
                std::unique_lock<std::mutex> locker(m_mutex);
                m_notFull.wait(locker, [this] { 
                        return m_needStop || NotFull() ; });
                if(m_needStop)
                    return;
                m_queue.push_back(std::forward<F>(x));
                m_notEmpty.notify_one();
            }
    
            //队列未满
            bool NotFull() const
            {
                bool full = m_queue.size() >= m_maxSize;
                if(full)
                    std::cout << "缓冲区满了...请等待" << std::endl;
                return !full;
            }
    
            //队列不为空
            bool NotEmpty() const
            {
                bool empty = m_queue.empty();
                if(empty)
                {
                    std::cout << "缓冲区空了...请等待" << std::endl;
                    std::cout << "线程ID:" << std::this_thread::get_id() << std::endl;
                }
                return !empty;
            }
    
        private:
            std::mutex m_mutex;      //互斥锁
            std::list<T> m_queue;    //队列,存放任务
            std::condition_variable m_notEmpty;  //队列不为空的条件变量
            std::condition_variable m_notFull;   //队列不为满的条件变量
            int m_maxSize;           //任务队列最大长度 
            bool m_needStop;         //终止标识
    
    };

    线程池:

    #include <stdio.h>
    #include <stdlib.h>
    #include <assert.h>
    #include <unistd.h>
    #include "SynQueue.h"
    #include <functional>
    #include <thread>
    #include <memory>
    #include <atomic>
    
    const int MaxTaskCount = 100;
    
    class ThreadPool
    {
        public:
            //规定任务类型为void(),我们可以通过c++11 不定参数模板来实现一个可接受任何函数的范型函数模板,这样就是一个可以接受任何任务的任务队列了。
            using Task = std::function<void()>;
            //hardware_concurrency检测硬件性能,给出默认线程数
            ThreadPool(int numThreads = std::thread::hardware_concurrency()):
                m_queue(MaxTaskCount) 
            {
                //初始化线程,并通过shared_ptr来管理
                Start(numThreads);
            }
    
            //销毁线程池
            ~ThreadPool(void)
            {
                Stop();
            }
    
            //终止所有线程,call_once保证函数只调用一次
            void Stop()
            {
                std::call_once(m_flag, [this] { StopThreadGroup(); });
            }
    
            //添加任务,普通版本和右值引用版本
            void AddTask(const Task& task)
            {
                m_queue.Put(task);
            }
            void AddTask(Task && task)
            {
                m_queue.Put(std::forward<Task>(task));
            }
    
        private:
    
            //停止线程池
            void StopThreadGroup()
            {
                m_queue.Stop();
                m_running = false;
                for(auto thread : m_threadgroup)
                {
                    if(thread)
                        thread->join();
                }
                m_threadgroup.clear();
            }
    
    
            void Start(int numThreads)
            {
                m_running = true;
                for(int i = 0; i < numThreads; ++i)
                {
                    //智能指针管理,并给出构建线程的参数,线程调用函数和参数
                    std::cout << "Init create thread pool" << std::endl;
                    m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
                }
            }
    
            //一次取出队列中全部事件
            void RunInThread_list()
            {
                while(m_running)
                {
                    std::list<Task> list;
                    std::cout << "take " << std::endl;
                    m_queue.Take(list);
                    for(auto &task : list)
                    {
                        if(!m_running)
                            return;
                        task();
                    }
                }
    
            }
    
            //一次只取一个事件
            void RunInThread()
            {
                std::cout << m_queue.Size() << std::endl;
                while(m_running)
                {
                    Task task;
                    if(!m_running)
                        return;
                    m_queue.Take(task);
                }
            }
    
        private:
            //线程池
            std::list<std::shared_ptr<std::thread>> m_threadgroup;
            //任务队列
            SynQueue<Task>m_queue;
            //原子布尔值
            std::atomic_bool m_running;
            //辅助变量->call_once
            std::once_flag m_flag;
    };
    
    int main(int argc, char *argv[])
    {
        ThreadPool pool(2);
    
        //创建线程向任务队列添加任务
        std::thread thd1([&pool]{
                for(int i = 0; i < 10; i++)
                {
                    auto thdId = std::this_thread::get_id();
                    pool.AddTask([thdId](){
                        std::cout << thdId << " thread execute task" << std::endl;
                        });
                }
            });
    
    
        std::this_thread::sleep_for(std::chrono::seconds(2));
        pool.Stop();
        thd1.join();
    
        return EXIT_SUCCESS;
    }

     参考书籍: 
    深入应用c++11

  • 相关阅读:
    各国语言缩写列表,各国语言缩写-各国语言简称,世界各国域名缩写
    How to see log files in MySQL?
    git 设置和取消代理
    使用本地下载和管理的免费 Windows 10 虚拟机测试 IE11 和旧版 Microsoft Edge
    在Microsoft SQL SERVER Management Studio下如何完整输出NVARCHAR(MAX)字段或变量的内容
    windows 10 x64系统下在vmware workstation pro 15安装macOS 10.15 Catelina, 并设置分辨率为3840x2160
    在Windows 10系统下将Git项目签出到磁盘分区根目录的方法
    群晖NAS(Synology NAS)环境下安装GitLab, 并在Windows 10环境下使用Git
    使用V-2ray和V-2rayN搭建本地代理服务器供局域网用户连接
    windows 10 专业版安装VMware虚拟机碰到的坑
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/12057422.html
Copyright © 2011-2022 走看看