1.C++11中引入了lambada表达式,很好的支持异步编程
2.C++11中引入了std::thread,可以很方便的构建线程,更方便的可移植特性
3.C++11中引入了std::mutex,可以很方便的构建线程锁互斥访问,更方便的可移植特性
4.C++11中引入了std::condition_variable,可以不依赖于win32 api实现自己的消费者生产者模型
5.利用改进版本的shared_ptr,可以很好的解决多线程生命周期的棘手问题
1 /************************************************************************/ 2 /* */ 3 /************************************************************************/ 4 5 #ifndef __CARBON_THREAD_POOL_H 6 #define __CARBON_THREAD_POOL_H 7 8 #include <vector> 9 #include <memory> 10 #include <thread> 11 #include <mutex> 12 #include <condition_variable> 13 #include <future> 14 #include <functional> 15 #include <stdexcept> 16 #include <string> 17 #include <sstream> 18 #include <deque> 19 20 namespace CARBON { 21 22 //************************************ 23 // Method: Create 24 // Returns: std::shared_ptr 25 // Qualifier: 用于创建智能指针实例 26 // Parameter: args, 可变参数,接受任意个数的参数,传递给T的构造函数 27 //************************************ 28 template<typename T, typename... ARG> 29 std::shared_ptr<T> Create(ARG&&... args) 30 { 31 struct TEnableShared : public T 32 { 33 TEnableShared(ARG&&... args) 34 : T(std::forward<ARG>(args)...) 35 {} 36 }; 37 38 return std::make_shared<TEnableShared>(std::forward<ARG>(args)...); 39 } 40 41 class ThreadPool : public std::enable_shared_from_this<ThreadPool> 42 { 43 protected: 44 ThreadPool() 45 : _stop(false) 46 {} 47 48 virtual ~ThreadPool() 49 { 50 { 51 std::unique_lock<std::mutex> lock(_lock); 52 _stop = true; 53 } 54 _condition.notify_all(); 55 for (std::thread &worker : _workers) 56 worker.join(); 57 } 58 59 public: 60 // initialize thread pool with number of threads 61 bool InitializePool(size_t threads) 62 { 63 if (!_workers.empty()) return true; 64 65 for (size_t i = 0; i < threads; ++i) 66 { 67 std::weak_ptr<ThreadPool> _wtp = this->shared_from_this(); 68 auto th = [](std::weak_ptr<ThreadPool> wtp) { 69 for (;;) 70 { 71 std::function<void()> task; 72 73 { 74 std::shared_ptr<ThreadPool> stp = wtp.lock(); 75 if (!stp) 76 return; 77 78 std::unique_lock<std::mutex> lock(stp->_lock); 79 auto shipment = [&] ()->bool { return stp->_stop || !stp->_tasks.empty(); }; 80 stp->_condition.wait(lock, shipment); 81 if (stp->_stop) 82 return; 83 if (stp->_tasks.empty()) 84 continue; 85 task = std::move(stp->_tasks.front()).task; 86 stp->_tasks.pop_front(); 87 } 88 89 task(); 90 } 91 }; 92 _workers.emplace_back(th, _wtp); 93 } 94 95 return !_workers.empty(); 96 } 97 98 //************************************ 99 // Method: EnqueueTask 100 // Returns: std::future, 值类型由functor f指定 101 // Qualifier: 可以借由返回的std::future获取结果,但是更建议在functor中做异步通知 102 // Parameter: taskid 用于接受任务的id描述 103 // Parameter: functor f, 函数对象,用于执行任务 104 // Parameter: args, 可变参数,接受任意个数的参数,传递给functor f 105 //************************************ 106 template<class F, class... Args> 107 auto EnqueueTask(std::string& taskid, F&& f, Args&&... args) 108 ->std::future<typename std::result_of<F(Args...)>::type> 109 { 110 if (_workers.empty()) 111 throw std::runtime_error("ThreadPool not initialized yet"); 112 113 using return_type = typename std::result_of<F(Args...)>::type; 114 115 auto task = std::make_shared<std::packaged_task<return_type()>>( 116 std::bind(std::forward<F>(f), std::forward<Args>(args)...) 117 ); 118 119 std::future<return_type> res = task->get_future(); 120 { 121 std::unique_lock<std::mutex> lock(_lock); 122 123 // don't allow enqueueing after stopping the pool 124 if (_stop) 125 throw std::runtime_error("enqueue on stopped ThreadPool"); 126 127 stThreadTask st; 128 std::stringstream ss; 129 ss << (void*)task.get(); 130 ss >> taskid; 131 st.taskid = taskid; 132 st.task = [task]() { (*task)(); }; 133 _tasks.push_back(st); 134 } 135 _condition.notify_one(); 136 return res; 137 } 138 139 //************************************ 140 // Method: GetTasksSize 141 // Returns: size_t 142 // Qualifier: 获取等待任务队列的任务数,正在执行的任务已经弹出队列,所以不参与计算 143 //************************************ 144 size_t GetTasksSize() 145 { 146 std::unique_lock<std::mutex> lock(_lock); 147 return _tasks.size(); 148 } 149 150 //************************************ 151 // Method: RemoveTask 152 // Returns: bool, 找到任务并移除则返回true,否则返回false 153 // Qualifier: 正在执行的任务已经弹出任务队列,应该在其它地方通知任务退出 154 // Qualifier: 执行完成的任务已经弹出任务队列,无法移除不存在的任务 155 // Qualifier: 该接口只能移除处在等待中的任务 156 // Parameter: taskid是任务的唯一标示,由EnqueueTask返回 157 //************************************ 158 bool RemoveTask(const std::string& taskid) 159 { 160 std::unique_lock<std::mutex> lock(_lock); 161 for (auto& t = _tasks.begin(); t != _tasks.end(); ++t) 162 { 163 if (taskid == t->taskid) 164 { 165 _tasks.erase(t); 166 return true; 167 } 168 } 169 170 return false; 171 } 172 173 private: 174 typedef struct stThreadTask 175 { 176 std::function<void()> task; 177 std::string taskid; 178 }stThreadTask; 179 180 // need to keep track of threads so we can join them 181 std::vector< std::thread > _workers; 182 // the task queue 183 std::deque< stThreadTask > _tasks; 184 185 // synchronization 186 std::mutex _lock; 187 std::condition_variable _condition; 188 bool _stop; 189 }; 190 } 191 192 #endif
使用enable_shared_from_this来确保内部线程访问指针时,不会因为指针失效造成的非法访问
weak_ptr很好的保证了ThreadPool的生命周期安全性和实效性
由于使用了share_from_this,将初始化代码整体拿出来放到InitializePool中实现
ThreadPool的构造函数和析构函数声明为protected,用于保证外部不要直接生成ThreadPool实例
应该使用Create函数来生成ThreadPool实例
测试代码如下:
1 namespace { 2 std::condition_variable _exit_cv; 3 } 4 5 void func(int n) 6 { 7 std::cout << "func with n " << n << std::endl; 8 } 9 10 using CARBON::ThreadPool; 11 12 std::string taskid; 13 std::shared_ptr<ThreadPool> stp = CARBON::Create<ThreadPool>(); 14 std::weak_ptr<ThreadPool> _wtp = stp; 15 stp->InitializePool(2); 16 17 18 stp->EnqueueTask(taskid, [](std::function<void(int)> cbf, std::weak_ptr<ThreadPool> wtp) ->int { 19 std::cout << "task1 "; 20 21 for (int i = 0; i < 5; ++i) { 22 std::mutex mtx; 23 std::unique_lock<std::mutex> lck(mtx); 24 if(_exit_cv.wait_for(lck, std::chrono::milliseconds(400)) == std::cv_status::no_timeout) 25 break; 26 27 if (cbf) cbf(i); 28 if (wtp.expired()) 29 break; 30 } 31 32 return 5; 33 }, func, _wtp);
当需要中断线程执行时,应该在外部通知线程中的任务自行退出
例子中可以在主线程中这么做
_exit_cv.notify_all();
_exit_cv用于模拟sleep操作
func用于模拟任务结果的异步通知,这里为了省事使用了函数指针,实际工作中应该使用functor来传递,以保证生命周期的有效性
比如std::bind和shared_ptr一起构造的functor对象