zoukankan      html  css  js  c++  java
  • 基于C++11实现的线程池

    1.C++11中引入了lambada表达式,很好的支持异步编程

    2.C++11中引入了std::thread,可以很方便的构建线程,更方便的可移植特性

    3.C++11中引入了std::mutex,可以很方便的构建线程锁互斥访问,更方便的可移植特性

    4.C++11中引入了std::condition_variable,可以不依赖于win32 api实现自己的消费者生产者模型

    5.利用改进版本的shared_ptr,可以很好的解决多线程生命周期的棘手问题

      1 /************************************************************************/
      2 /*                                                                      */
      3 /************************************************************************/
      4 
      5 #ifndef __CARBON_THREAD_POOL_H
      6 #define __CARBON_THREAD_POOL_H
      7 
      8 #include <vector>
      9 #include <memory>
     10 #include <thread>
     11 #include <mutex>
     12 #include <condition_variable>
     13 #include <future>
     14 #include <functional>
     15 #include <stdexcept>
     16 #include <string>
     17 #include <sstream>
     18 #include <deque>
     19 
     20 namespace CARBON {
     21 
     22     //************************************
     23     // Method:    Create
     24     // Returns:   std::shared_ptr
     25     // Qualifier: 用于创建智能指针实例
     26     // Parameter: args, 可变参数,接受任意个数的参数,传递给T的构造函数
     27     //************************************
     28     template<typename T, typename... ARG>
     29     std::shared_ptr<T> Create(ARG&&... args)
     30     {
     31         struct TEnableShared : public T
     32         {
     33             TEnableShared(ARG&&... args)
     34                 : T(std::forward<ARG>(args)...)
     35             {}
     36         };
     37 
     38         return std::make_shared<TEnableShared>(std::forward<ARG>(args)...);
     39     }
     40 
     41     class ThreadPool : public std::enable_shared_from_this<ThreadPool>
     42     {
     43     protected:
     44         ThreadPool()
     45             : _stop(false)
     46         {}
     47 
     48         virtual ~ThreadPool()
     49         {
     50             {
     51                 std::unique_lock<std::mutex> lock(_lock);
     52                 _stop = true;
     53             }
     54             _condition.notify_all();
     55             for (std::thread &worker : _workers)
     56                 worker.join();
     57         }
     58 
     59     public:
     60         // initialize thread pool with number of threads
     61         bool InitializePool(size_t threads)
     62         {
     63             if (!_workers.empty()) return true;
     64 
     65             for (size_t i = 0; i < threads; ++i)
     66             {
     67                 std::weak_ptr<ThreadPool> _wtp = this->shared_from_this();
     68                 auto th = [](std::weak_ptr<ThreadPool> wtp) {
     69                     for (;;)
     70                     {
     71                         std::function<void()> task;
     72 
     73                         {
     74                             std::shared_ptr<ThreadPool> stp = wtp.lock();
     75                             if (!stp)
     76                                 return;
     77 
     78                             std::unique_lock<std::mutex> lock(stp->_lock);
     79                             auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); };
     80                             stp->_condition.wait(lock, shipment);
     81                             if (stp->_stop)
     82                                 return;
     83                             if (stp->_tasks.empty())
     84                                 continue;
     85                             task = std::move(stp->_tasks.front()).task;
     86                             stp->_tasks.pop_front();
     87                         }
     88 
     89                         task();
     90                     }
     91                 };
     92                 _workers.emplace_back(th, _wtp);
     93             }
     94 
     95             return !_workers.empty();
     96         }
     97 
     98         //************************************
     99         // Method:    EnqueueTask
    100         // Returns:   std::future, 值类型由functor f指定
    101         // Qualifier: 可以借由返回的std::future获取结果,但是更建议在functor中做异步通知
    102         // Parameter: taskid 用于接受任务的id描述
    103         // Parameter: functor f, 函数对象,用于执行任务
    104         // Parameter: args, 可变参数,接受任意个数的参数,传递给functor f
    105         //************************************
    106         template<class F, class... Args>
    107         auto EnqueueTask(std::string& taskid, F&& f, Args&&... args)
    108             ->std::future<typename std::result_of<F(Args...)>::type>
    109         {
    110             if (_workers.empty())
    111                 throw std::runtime_error("ThreadPool not initialized yet");
    112 
    113             using return_type = typename std::result_of<F(Args...)>::type;
    114 
    115             auto task = std::make_shared<std::packaged_task<return_type()>>(
    116                 std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    117                 );
    118 
    119             std::future<return_type> res = task->get_future();
    120             {
    121                 std::unique_lock<std::mutex> lock(_lock);
    122 
    123                 // don't allow enqueueing after stopping the pool
    124                 if (_stop)
    125                     throw std::runtime_error("enqueue on stopped ThreadPool");
    126 
    127                 stThreadTask st;
    128                 std::stringstream ss;
    129                 ss << (void*)task.get();
    130                 ss >> taskid;
    131                 st.taskid = taskid;
    132                 st.task = [task]() { (*task)(); };
    133                 _tasks.push_back(st);
    134             }
    135             _condition.notify_one();
    136             return res;
    137         }
    138 
    139         //************************************
    140         // Method:    GetTasksSize
    141         // Returns:   size_t
    142         // Qualifier: 获取等待任务队列的任务数,正在执行的任务已经弹出队列,所以不参与计算
    143         //************************************
    144         size_t GetTasksSize()
    145         {
    146             std::unique_lock<std::mutex> lock(_lock);
    147             return _tasks.size();
    148         }
    149 
    150         //************************************
    151         // Method:    RemoveTask
    152         // Returns:   bool, 找到任务并移除则返回true,否则返回false
    153         // Qualifier: 正在执行的任务已经弹出任务队列,应该在其它地方通知任务退出
    154         // Qualifier: 执行完成的任务已经弹出任务队列,无法移除不存在的任务
    155         // Qualifier: 该接口只能移除处在等待中的任务
    156         // Parameter: taskid是任务的唯一标示,由EnqueueTask返回
    157         //************************************
    158         bool RemoveTask(const std::string& taskid)
    159         {
    160             std::unique_lock<std::mutex> lock(_lock);
    161             for (auto& t = _tasks.begin(); t != _tasks.end(); ++t)
    162             {
    163                 if (taskid == t->taskid)
    164                 {
    165                     _tasks.erase(t);
    166                     return true;
    167                 }
    168             }
    169 
    170             return false;
    171         }
    172 
    173     private:
    174         typedef struct stThreadTask
    175         {
    176             std::function<void()> task;
    177             std::string taskid;
    178         }stThreadTask;
    179 
    180         // need to keep track of threads so we can join them
    181         std::vector< std::thread > _workers;
    182         // the task queue
    183         std::deque< stThreadTask > _tasks;
    184 
    185         // synchronization
    186         std::mutex _lock;
    187         std::condition_variable _condition;
    188         bool _stop;
    189     };
    190 }
    191 
    192 #endif

    使用enable_shared_from_this来确保内部线程访问指针时,不会因为指针失效造成的非法访问

    weak_ptr很好的保证了ThreadPool的生命周期安全性和实效性

    由于使用了share_from_this,将初始化代码整体拿出来放到InitializePool中实现

    ThreadPool的构造函数和析构函数声明为protected,用于保证外部不要直接生成ThreadPool实例

    应该使用Create函数来生成ThreadPool实例

    测试代码如下:

     1 namespace {
     2     std::condition_variable _exit_cv;
     3 }
     4 
     5 void func(int n)
     6 {
     7     std::cout << "func with n " << n << std::endl;
     8 }
     9 
    10 using CARBON::ThreadPool;
    11 
    12 std::string taskid;
    13 std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>();
    14 std::weak_ptr<ThreadPool> _wtp = stp;
    15 stp->InitializePool(2);
    16 
    17 
    18 stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int {
    19         std::cout << "task1
    ";
    20 
    21         for (int i = 0; i < 5; ++i) {
    22             std::mutex mtx;
    23             std::unique_lock<std::mutex> lck(mtx);
    24             if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout)
    25                 break;
    26 
    27             if (cbf) cbf(i);
    28             if (wtp.expired())
    29                 break;
    30         }
    31 
    32         return 5;
    33     }, func, _wtp);

    当需要中断线程执行时,应该在外部通知线程中的任务自行退出

    例子中可以在主线程中这么做

        _exit_cv.notify_all();
    _exit_cv用于模拟sleep操作
    func用于模拟任务结果的异步通知,这里为了省事使用了函数指针,实际工作中应该使用functor来传递,以保证生命周期的有效性
    比如std::bind和shared_ptr一起构造的functor对象

  • 相关阅读:
    JavaScript 命名空间
    雅虎网站页面性能优化的34条黄金守则
    利用模板引擎配合ajax进行数据的导入
    canvas 实现小人的行走和上下左右的变换
    canvas 做一个小鸟运动的小游戏 (第二步) 使小鸟飞起来
    canvas 做一个小鸟运动的小游戏 (第一步)
    canvas 画一个小时钟
    更改博客的通知
    pat advanced 1139. First Contact (30)
    10分钟上手python pandas
  • 原文地址:https://www.cnblogs.com/jojodru/p/6675200.html
Copyright © 2011-2022 走看看