0 前言
最近在写MySQL冷备server的一个模块,稍微接触到一点线程池的东西,自己也就想尝试写一个简单的线程池练练手。
这个线程池在创建时,即按照最大的线程数生成线程。
然后作业任务通过add_task接口往线程池中加入需要运行的任务,再调用线程池的run函数开始运行所有任务,每个线程从任务队列中读取任务,处理完一个任务后再读取新的任务,直到最终任务队列为空。
补充:再回头看这个设计,其实算不了线程池,最多算是多线程执行任务。
把所有的任务描述信息先加入到CThreadPool的任务队列里面,然后调用CThreadPool的run函数去创建指定数量的线程,每个线程互斥的从任务队列里面取出任务去执行。
1 线程池设计
简单描述如下(假设任务类名为CTasklet):
1、CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);
2、创建任务,并把任务加入到线程池
CTasklet *pTask1 = new CTasklet();
CTasklet *pTask2 = new CTasklet();
...
thread_pool.add_task(pTask1);
thread_pool.add_task(pTask2);
...
3、调用线程池的run方法开始执行任务
thread_pool.run();
4、等待任务执行完成
thread_pool.join_thread();
2 源码
下面给出完整的线程池代码
1 /* 2 * file: thread_pool.h 3 * desc: 简单的线程池,一次性初始化任务队列和线程池。 4 * 5 */ 6 7 #ifndef _THREAD_POOL_H_ 8 #define _THREAD_POOL_H_ 9 10 #include <pthread.h> 11 #include <vector> 12 13 using namespace std; 14 15 template<typename workType> 16 class CThreadPool 17 { 18 public: 19 typedef void * (thread_func)(void *); 20 21 CThreadPool(int thread_num, size_t stack_size = 10485760); 22 ~CThreadPool(); 23 24 // 向任务队列中添加任务 25 int add_task(workType *pTask); 26 27 // 创建新线程并执行 28 int run(); 29 30 // 等待所有的线程执行结束 31 int join_thread(); 32 33 private: 34 int init_thread_attr(); 35 int destroy_thread_attr(); 36 37 int set_thread_stacksize(size_t stack_size); 38 int set_thread_joinable(); 39 40 protected: 41 // 线程池执行函数,必须为static 42 static void start_routine(void *para); 43 44 private: 45 pthread_attr_t attr_; 46 static pthread_mutex_t mutex_lock_; 47 static list<workType *> list_task_; 48 49 int thread_num_; // 最大线程数 50 vector<pthread_t> thread_id_vec_; 51 }; 52 #endif
1 #include "pthread_pool.h" 2 3 template<typename workType> 4 pthread_mutex_t CThreadPool<workType>::mutex_lock_; 5 6 template<typename workType> 7 list<workType*> CThreadPool<workType*>::list_task_; 8 9 template<typename workType> 10 CThreadPool<workType>::CThreadPool(int thread_num, size_t stack_size) 11 { 12 thread_num_ = thread_num; 13 pthread_mutex_init(&mutex_lock_, NULL); 14 15 init_thread_attr(); 16 set_thread_stacksize(stack_size); 17 set_thread_joinable(); 18 } 19 20 template<typename workType> 21 CThreadPool<workType>::~CthreadPool() 22 { 23 destroy_thread_attr(); 24 } 25 26 template <typename workType> 27 int init_thread_attr() 28 { 29 return pthread_attr_init(&m_attr); 30 } 31 32 template <typename workType> 33 int CThreadPool<workType>::destroy_thread_attr() 34 { 35 return pthread_attr_destroy(&attr_); 36 } 37 38 template <typename workType> 39 int CThreadPool<workType>::set_thread_stacksize(size_t stack_size) 40 { 41 return pthread_attr_setstacksize(&attr_, stack_size); 42 } 43 44 template <typename workType> 45 int CThreadPool<workType>::set_thread_joinable() 46 { 47 return pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE); 48 } 49 50 template <typename workType> 51 void CThreadPool<workType>::start_routine(void *para) 52 { 53 workType *pWorkType = NULL; 54 55 while (1) { 56 pthread_mutex_lock(&mutex_lock_); 57 58 if (list_task_.empty()) { 59 pthread_mutex_unlock(mutex_lock_); 60 return; 61 } 62 63 pWorkType = *(list_task_.begin()); 64 list_task_.pop_front(); 65 pthread_mutex_unlock(&mutex_lock_); 66 67 pWorkType->run(); 68 delete pWorkType; 69 pWorkType = NULL; 70 } 71 } 72 73 template <typename workType> 74 int CThreadPool<workType>::add_task(workType *pTask) 75 { 76 pthread_mutex_lock(&mutex_lock_); 77 list_task_.push_back(pTask); 78 pthread_mutex_unlock(&mutex_lock_); 79 return 0; 80 } 81 82 template <typename workType> 83 int CThreadPool<workType>::run() 84 { 85 int rc; 86 pthread_t tid; 87 for (int i = 0; i < thread_num_; ++i) { 88 rc = pthread_create(&tid, &attr_, (thread_func)start_routine, NULL); 89 thread_id_vec_.push_back(tid); 90 } 91 return rc; 92 } 93 94 template <typename workType> 95 int CThreadPool<workType>::join_thread() 96 { 97 int rc = 0; 98 vector<pthread_t>::iterator iter; 99 for (iter = thread_id_vec_.begin(); iter != thread_id_vec_.end(); ++iter) { 100 rc = pthread_join((*iter), NULL); 101 } 102 thread_id_vec_.clear(); 103 return rc; 104 }
测试代码如下
1 #include <unistd.h> 2 3 #include <iostream> 4 #include <list> 5 6 using namespace std; 7 8 class CTasklet 9 { 10 public: 11 CTasklet(int num) { 12 num_ = num; 13 cout << "CTasklet ctor create num: " << num_ << endl; 14 } 15 16 ~CTasklet() { 17 cout << "CTasklet dtor delete num: " << num_ << endl; 18 } 19 20 int run() { 21 cout << "CTasklet sleep begin: " << num_ << endl; 22 sleep(num_); 23 cout << "CTasklet sleep end: " << num_ << endl; 24 } 25 26 private: 27 int num_; 28 }; 29 30 #define MAX_THREAD_NUM 3 31 int main(int argc, char **argv) 32 { 33 // Step1. 创建线程池 34 CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM); 35 36 // Step2. 创建任务,并加入到线程池中 37 for (int i = 0; i < 6; ++i) { 38 CTasklet *pTask = new CTasklet(i); 39 thread_pool.add_task(pTask); 40 } 41 // Step3. 开始执行任务 42 thread_pool.run(); 43 // Step4. 等待任务结束 44 thread_pool.join_thread(); 45 46 return 0; 47 48 }
3 总结
上面的线程池属于最简单的一类线程池,即相当于程序运行时候就开启n个线程来执行任务。真正的线程池需要考虑的方面比较多,比如1、线程池中的线程数应该能动态变化;2、线程池能动态调度线程来运行任务,以达到均衡;3、线程池还应该能记录任务的运行时间,防止超时等等。
不过,起码我们已经开了个头,实现了一个简单的线程池,接下来,让我们在这个基础上一步步调整、完善。
PS:对于线程池的考虑,我能想到的有动态增减线程数、超时机制、负载均衡。不知道大家理解线程池还需要考虑什么场景。