zoukankan      html  css  js  c++  java
  • 使用C++11实现一个半同步半异步线程池

    此处输入图片的描述

    前言

    C++11之前我们使用线程需要系统提供API、posix线程库或者使用boost提供的线程库,C++11后就加入了跨平台的线程类std::thread,线程同步相关类std::mutex、std::lock_guard、std::condition_variable、std::atomic以及异步操作相关类std::async、std::future、std::promise等等,这使得我们编写跨平台的多线程程序变得容易,线程的一个高级应用就是线程池,使用线程池可以充分利用多核CPU的并行计算能力,以及避免了使用单个线程的创建和销毁的开销,所以线程池在实际项目中用的很广泛,很多RPC框架都是用了线程池来处理事务,比如说Thrifteasyrpc等等,接下来我们将使用C++11来实现一个通用的半同步半异步线程池(个人博客也发表了《使用C++11实现一个半同步半异步线程池》)。

    实现

    一个半同步半异步线程池分为三层。

    1. 同步服务层:它处理来自上层的任务请求,上层的请求可能是并发的,这些请求不是马上就会被处理的,而是将这些任务放到一个同步排队层中,等待处理。
    2. 同步排队层: 来自上层的任务请求都会加到排队层中等待处理,排队层实际就是一个std::queue。
    3. 异步服务层: 这一层中会有多个线程同时处理排队层中的任务,异步服务层从同步排队层中取出任务并行的处理。

    这三个层次之间需要使用std::mutex、std::condition_variable来进行事件同步,线程池的实现代码如下。

    #ifndef _THREADPOOL_H
    #define _THREADPOOL_H
    
    #include <vector>
    #include <queue>
    #include <thread>
    #include <mutex>
    #include <memory>
    #include <functional>
    #include <condition_variable>
    #include <atomic>
    #include <type_traits>
    
    static const std::size_t max_task_quque_size = 100000;
    static const std::size_t max_thread_size = 30;
    
    class thread_pool
    {
    public:
        using work_thread_ptr = std::shared_ptr<std::thread>;
        using task_t = std::function<void()>; 
    
        explicit thread_pool() : _is_stop_threadpool(false) {}
    
        ~thread_pool()
        {
            stop();
        }
    
        void init_thread_num(std::size_t num)
        {
            if (num <= 0 || num > max_thread_size)
            {
                std::string str = "Number of threads in the range of 1 to " + std::to_string(max_thread_size);
                throw std::invalid_argument(str);
            }
    
            for (std::size_t i = 0; i < num; ++i)
            {
                work_thread_ptr t = std::make_shared<std::thread>(std::bind(&thread_pool::run_task, this));
                _thread_vec.emplace_back(t);
            }
        }
    
        // 支持普通全局函数、静态函数、以及lambda表达式
        template<typename Function, typename... Args>
        void add_task(const Function& func, Args... args)
        {
            if (!_is_stop_threadpool)
            {
                // 用lambda表达式来保存函数地址和参数
                task_t task = [&func, args...]{ return func(args...); };
                add_task_impl(task);
            }
        }
    
        // 支持函数对象(仿函数)
        template<typename Function, typename... Args>
        typename std::enable_if<std::is_class<Function>::value>::type add_task(Function& func, Args... args)
        {
            if (!_is_stop_threadpool)
            {
                task_t task = [&func, args...]{ return func(args...); };
                add_task_impl(task);
            }
        }
    
        // 支持类成员函数
        template<typename Function, typename Self, typename... Args>
        void add_task(const Function& func, Self* self, Args... args)
        {
            if (!_is_stop_threadpool)
            {
                task_t task = [&func, &self, args...]{ return (*self.*func)(args...); };
                add_task_impl(task);
            }
        }
    
        void stop()
        {
            // 保证terminate_all函数只被调用一次
            std::call_once(_call_flag, [this]{ terminate_all(); });
        }
    
    private:
        void add_task_impl(const task_t& task)
        {
            {
                // 任务队列满了将等待线程池消费任务队列
                std::unique_lock<std::mutex> locker(_task_queue_mutex);
                while (_task_queue.size() == max_task_quque_size && !_is_stop_threadpool)
                {
                    _task_put.wait(locker);
                }
    
                _task_queue.emplace(std::move(task));
            }
    
           // 向任务队列插入了一个任务并提示线程池可以来取任务了
            _task_get.notify_one();
        }
    
        void terminate_all()
        {
            _is_stop_threadpool = true;
            _task_get.notify_all();
    
            for (auto& iter : _thread_vec)
            {
                if (iter != nullptr)
                {
                    if (iter->joinable())
                    {
                        iter->join();
                    }
                }
            }
            _thread_vec.clear();
    
            clean_task_queue();
        }
    
        void run_task()
        {
            // 线程池循环取任务
            while (true)
            {
                task_t task = nullptr;
                {
                    // 任务队列为空将等待
                    std::unique_lock<std::mutex> locker(_task_queue_mutex);
                    while (_task_queue.empty() && !_is_stop_threadpool)
                    {
                        _task_get.wait(locker);
                    }
    
                    if (_is_stop_threadpool)
                    {
                        break;
                    }
    
                    if (!_task_queue.empty())
                    {
                        task = std::move(_task_queue.front());
                        _task_queue.pop();
                    }
                }
    
                if (task != nullptr)
                {
                    // 执行任务,并通知同步服务层可以向队列放任务了
                    task();
                    _task_put.notify_one();
                }
            }
        }
    
        void clean_task_queue()
        {
            std::lock_guard<std::mutex> locker(_task_queue_mutex);
            while (!_task_queue.empty())
            {
                _task_queue.pop();
            }
        }
    
    private:
        std::vector<work_thread_ptr> _thread_vec;
        std::condition_variable _task_put;
        std::condition_variable _task_get;
        std::mutex _task_queue_mutex;
        std::queue<task_t> _task_queue;
        std::atomic<bool> _is_stop_threadpool;
        std::once_flag _call_flag;
    };
    
    #endif
    
    

    测试代码

    #include <iostream>
    #include <string>
    #include <chrono>
    #include "thread_pool.hpp"
    
    void test_task(const std::string& str)
    {
        std::cout << "Current thread id: " << std::this_thread::get_id() << ", str: " << str << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
    
    class Test
    {
    public:
        void print(const std::string& str, int i)
        {
            std::cout << "Test: " << str << ", i: " << i << std::endl;
        }
    };
    
    class Test2
    {
    public:
        void operator()(const std::string& str, int i)
        {
            std::cout << "Test2: " << str << ", i: " << i << std::endl;
        }
    };
    
    int main()
    {
        Test t;
        Test2 t2;
        thread_pool pool;
        // 启动10个线程
        pool.init_thread_num(10);
    
        std::string str = "Hello world";
        
        for (int i = 0; i < 1000; ++i)
        {
            // 支持lambda表达式
            pool.add_task([]{ std::cout << "Hello ThreadPool" << std::endl; });
            // 支持全局函数
            pool.add_task(test_task, str);
            // 支持函数对象
            pool.add_task(t2, str, i);
            // 支持类成员函数
            pool.add_task(&Test::print, &t, str, i);
        }
    
        std::cin.get();
        std::cout << "##############END###################" << std::endl;
        return 0;
    }
    
    

    测试程序启动了十个线程并调用add_task函数加入了4000个任务,add_task支持普通全局函数、静态函数、类成员函数、函数对象(仿函数)以及lambda表达式,并且支持函数传入,该线程池的实现以及测试代码我已经放到了github上。

    参考资料

    《深入应用C++11--代码优化与工程级应用》

  • 相关阅读:
    斐波那契数列 详解
    ASP.NET 系列:RBAC权限设计
    架构系列:ASP.NET 项目结构搭建
    EntityFramework系列:Repository模式与单元测试
    PHP 系列:PHP Web 开发基础
    Java Web系列:Spring Boot 基础
    ddddddd
    ddd
    asdfsf
    sdfsdfsdf
  • 原文地址:https://www.cnblogs.com/highway-9/p/5988991.html
Copyright © 2011-2022 走看看