zoukankan      html  css  js  c++  java
  • C++ 动态伸缩线程池

    简述

    之前阅读过一份 C++11 写的线程池源码,写了一篇随笔 C++11的简单线程池代码阅读 https://www.cnblogs.com/oloroso/p/5881863.html
    这是一个固定线程数量的线程池,绝大部分情况下已经适用了。有一些特殊场景,我们需要一个按需创建线程的线程池,于是我这里改写了一个动态创建线程的简单线程池代码。

    代码

    线程池的线程根据线程池内未完成的任务数去动态创建,如果剩余任务超过 10 个,且没有达到线程池的线程数上线,则创建新线程去处理。
    当运行的工作线程,从工作队列中无法取出新任务(即任务队列已空),则进行等待,最多等待 180 秒,就会去重新判断任务池是否能取出新任务,如果连续 3 次等待都没能有新任务,则当前工作线程退出(也可以不用等待3次,只等待一次就直接退出,这里是为了防止刚好180秒等待结束的时候,入队了新任务,然后这个线程退出了,又需要创建新的线程,虽然这个是极低的概率)。

    #ifndef DYNAMICTHREADPOOL_HPP
    #define DYNAMICTHREADPOOL_HPP
    
    
    #include <condition_variable>
    #include <functional>
    #include <future>
    #include <map>
    #include <memory>
    #include <mutex>
    #include <queue>
    #include <stdexcept>
    #include <thread>
    
    // 线程池类
    class DynamicThreadPool
    {
    public:
        // 构造函数,传入线程数
        DynamicThreadPool(size_t threads);
        // 析构
        ~DynamicThreadPool();
    
        // 入队任务(传入函数和函数的参数)
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>;
        // 一个最简单的函数包装模板可以这样写(C++11)适用于任何函数(变参、成员都可以)
        // template<class F, class... Args>
        // auto enqueue(F&& f, Args&&... args) -> decltype(declval<F>()(declval<Args>()...))
        // {    return f(args...); }
        // C++14更简单
        // template<class F, class... Args>
        // auto enqueue(F&& f, Args&&... args)
        // {    return f(args...); }
    
        // 停止线程池
        void stopAll();
    
    
    private:
        void newThread();
    
    private:
        std::atomic_int run_workers;   // 当前运行的工作线程数
        int             max_workers;   // 最大工作线程数
        // 任务队列
        std::queue<std::function<void()>> tasks;
    
        // synchronization 异步
        std::mutex              queue_mutex;   // 队列互斥锁
        std::condition_variable condition;     // 条件变量
        bool                    stop;          // 停止标志
    };
    
    // 构造函数仅启动一些工作线程
    inline DynamicThreadPool::DynamicThreadPool(size_t threads)
        : run_workers(0)
        , max_workers(threads)
        , stop(false)
    {
        if (max_workers == 0 || max_workers > (int)std::thread::hardware_concurrency()) {
            max_workers = std::thread::hardware_concurrency();
        }
    }
    
    // 添加一个新的工作任务到线程池
    template<class F, class... Args>
    auto DynamicThreadPool::enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
    {
        using return_type = typename std::result_of<F(Args...)>::type;
    
        // 将任务函数和其参数绑定,构建一个packaged_task
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        // 获取任务的future
        std::future<return_type> res = task->get_future();
    
        size_t tasks_size = 0;
        {
            std::lock_guard<std::mutex> lock(queue_mutex);
    
            // don't allow enqueueing after stopping the pool
            // 不允许入队到已经停止的线程池
            if (stop) { throw std::runtime_error("enqueue on stopped ThreadPool"); }
            // 将任务添加到任务队列
            tasks.emplace([task]() { (*task)(); });
            tasks_size = tasks.size();
        }
        // 判断当前的任务积累数,如果任务积累太多,就再创建一个线程
        if (run_workers == 0 || (tasks_size > 10 && run_workers < max_workers)) {
            newThread();   // 创建新线程
        }
        // 发送通知,唤醒某一个工作线程取执行任务
        condition.notify_one();
        return res;
    }
    
    inline DynamicThreadPool::~DynamicThreadPool()
    {
        stopAll();
    }
    
    inline void DynamicThreadPool::stopAll()
    {
        {
            // 拿锁
            std::unique_lock<std::mutex> lock(queue_mutex);
            // 停止标志置true
            stop = true;
        }
        // 通知所有工作线程,唤醒后因为stop为true了,所以都会结束
        condition.notify_all();
        // 等待所有线程结束
        while (run_workers > 0) {
            // 如果工作线程是意外结束的,没有将 run_workers 减一,那么这里会陷入死循环
            // 所以这里也可以修改为循环一定次数后就退出,不等了
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    }
    
    inline void DynamicThreadPool::newThread()
    {
        ++run_workers;   // 运行线程数加一
    
        std::thread thr([this]() {
            int no_task = 0;   // 用于标记任务循环
            while (true) {
                if (stop) {
                    break;   // 退出线程循环
                }
                do {
                    std::function<void()> task;   // 任务对象
                    {
                        std::lock_guard<std::mutex> lock(this->queue_mutex);
                        // 从任务队列取出一个任务
                        if (this->tasks.empty()) {
                            ++no_task;
                            break;   // 没有任务,退出任务执行循环
                        }
                        // 取得任务队首任务(注意此处的std::move)
                        task = std::move(this->tasks.front());
                        // 从队列移除
                        this->tasks.pop();
                    }
                    // 执行任务
                    task();
    
                } while (true);
                // 如果超过3次都是空循环,那么就退出线程
                if (no_task > 3) { break; }
    
                // 没有任务的时候,等待条件触发
                // 拿锁(独占所有权式),队列锁也充当一下条件锁
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                // 等待条件成立(只等待180秒,条件不成立就退出)
                this->condition.wait_for(lock, std::chrono::seconds(180), [this] {
                    return this->stop || !this->tasks.empty();
                });
            }   // end while
            // 线程退出的时候,从工作线程组中移除
            --run_workers;   // 运行线程数减一
        });
        thr.detach();   // 分离执行
    }
    
    #endif   // DYNAMICTHREADPOOL_HPP
    

    简单的测试代码

    #include "DynamicThreadPool.hpp"
    #include <iostream>
    
    int main()
    {
        cout << "开始测试!" << endl;
        DynamicThreadPool tp(4);
    
        auto func = [](int i) {
            auto id = std::this_thread::get_id();
            std::cout << id << "  " << i << endl;
            //模拟耗时较长任务
            std::this_thread::sleep_for(std::chrono::seconds(1));
        };
    
        // 先提交9个
        int i = 1;
        for (; i < 10; ++i) {
            tp.enqueue(func, i);
        }
        // 休眠60秒
        std::this_thread::sleep_for(std::chrono::seconds(20));
        // 提交50个
        for (; i < 60; ++i) {
            tp.enqueue(func, i);
        }
        // 休眠等待结束
        std::this_thread::sleep_for(std::chrono::seconds(40));
        // 提交50个
        for (; i < 120; ++i) {
            tp.enqueue(func, i);
        }
    
        std::cout<<"测试结束"<<std::endl;
        return 0;
    }
    
  • 相关阅读:
    硬盘的结构和介绍,硬盘MBR详细介绍(超详细彩图)
    websocket协议学习
    Qt4可以使用trUtf8函数,其内容可以是中文,也可以是F硬编码
    QString转换为LPTSTR(使用了reinterpret_cast,真是叹为观止,但是也开阔了思路),三篇文章合起来的各种转换方法
    系统高可用
    Visual Studio
    管道是如何建立起来的?
    CLR和.Net对象
    任务调度
    路由与控制器
  • 原文地址:https://www.cnblogs.com/oloroso/p/15716114.html
Copyright © 2011-2022 走看看