0 前言
线程池的组件网上很多,之前我自己也尝试写个一个demo,但这些组件一般都比较简单,没有完整的实现后台线程池组件应用的功能。因此,这里我们实现一个可以用在线上环境的线程池组件,该线程池组件具备线程池应用的特性,如下所示:
1. 伸缩性:即线程池中线程的个数应该是动态变化的。繁忙的时候可以申请更多的线程;空闲的时候则注销一部分线程。
2. 线程状态:线程池中对线程的管理引入睡眠、唤醒机制。当线程没有任务在运行时,使线程处于睡眠状态。
3. 线程管理:对线程池中线程的申请和注销,不是通过创建一个单独线程来管理,而是线程池自动管理。
最终,我们实现的线程池局部一下几个特点:
1. 线程池中线程的个数介于min和max之间;
2. 线程池中线程具有睡眠和唤醒机制;
3. 当线程池中的线程睡眠时间超过1秒钟,则结束一个线程;当线程池中持续1秒没有空闲线程时,则创建一个新线程。
1 关键数据结构
主要的数据结构包括两个CThreadPool和CThreadWorker。
这里,我们先给出数据结构的定义,然后简要描述他们的作用。
相关数据结构定义如下:
1 class CThreadWorker 2 { 3 CThreadWorker *next_; // 线程池中的线程是单链表结构保存 4 5 int wake_up_; // 唤醒标志 6 pthread_cond_t wake_; 7 pthread_mutex_t mutex_; 8 9 pthread_t tid_; 10 void (*func)(void *arg); // 函数执行体 11 void *arg; // 函数执行体参数 12 time_t sleep_when_; // 线程睡眠时间 13 }; 14 15 class CThreadPool 16 { 17 CThreadWorker *head_; 18 19 unsigned int min_; // 最少线程数 20 unsigned int cur_; // 当前线程数 21 unsigned int max_; // 最大线程数 22 23 time_t last_empty_; // 最后一次线程池中没有线程的时间 24 pthread_mutex_t mutex_; 25 };
其中,CThreadPool用来管理线程池,记录线程池当前线程个数、最小线程个数、最大线程个数等。
CThreadWorker是具体保存线程记录的实体,它里面保存了线程的执行函数体,函数参数、睡眠时间等等。还有一个信号量,用来让线程睡眠或者唤醒。
当我们创建一个线程池时,即默认创建了min个CThreadWorker,每个线程实体默认都是睡眠的,阻塞在pthread_cond_wait处,然后我们具体执行用户函数时,从线程池中获取线程,更改该线程的func,arg等参数,然后唤醒该线程。
中间,线程池会对线程做一些管理,保证线程池的伸缩性。
2 源码
2.1 头文件
1 #ifndef _THREAD_POOL_H_ 2 #define _THREAD_POOL_H_ 3 4 #include <pthread.h> 5 #include <time.h> 6 7 struct CThreadWorker 8 { 9 CThreadWorker *next_; // 线程池中的线程是单链表结构保存 10 11 int wake_up_; // 唤醒标志 12 pthread_cond_t wake_; 13 pthread_mutex_t mutex_; 14 15 pthread_t tid_; 16 void (*func)(void *arg); // 函数执行体 17 void *arg; // 函数执行体参数 18 time_t sleep_when_; // 线程睡眠时间 19 }; 20 21 struct CThreadPool 22 { 23 CThreadWorker *head_; 24 25 unsigned int min_; // 最少线程数 26 unsigned int cur_; // 当前线程数 27 unsigned int max_; // 最大线程数 28 29 time_t last_empty_; // 最后一次线程池中没有线程的时间 30 pthread_mutex_t mutex_; 31 }; 32 33 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max); 34 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg); 35 #endif
2.2 实现文件
实现文件主要完成.h中的CreateThreadPool和StartWork。我们先给个流程图大概描述一下。
2.2.1 CreateThreadPool流程描述
如下图所示为创建线程池的流程,其中最关键的步骤是DoProcess,当执行到DoProcess时,我们默认创建了min个线程,所有线程都因为信号而睡眠。StartWork里面会获取一个线程,然后修改该线程的func,arg参数,最后唤醒线程执行我们的任务。当一个线程执行完的时候,我们需要判断线程池当前的状态,是需要新创建一个线程,还是把该线程重新加入到线程池,还是注销该线程。具体的这些逻辑图示不好描述,我们在下面代码里给出注释。
2.2.2 StartWork流程描述
如下图是启动一个任务的流程,2.2.1我们已经描述到,启动任务时,我们会从线程池中拿出一个线程,修改该线程的func/arg属性,然后唤醒该线程。当线程执行完以后,是需要重新加入线程池,还是注销,则是在2.2.1的DoProcess中处理。
2.3 实现文件
1 #include <stdlib.h> 2 #include "pthread_pool.h" 3 4 // 线程池持续1秒没有空闲线程 5 #define WaitWorkerTimeout(pool) ((time(NULL) - pool->last_empty_) > 1) 6 // 线程池中没有线程,所有的线程已经pop出去执行具体的任务去了 7 #define NoThreadInPool(pool) (pool->head_ == NULL) 8 #define CanCreateThread(pool) (pool->cur_ < pool->max_) 9 10 11 static int CreateOneThread(CThreadPool *pool); 12 static void *DoProcess(void *arg); 13 static void PushWork(CThreadPool *pool, CThreadWorker *worker); 14 static void PopWork(CThreadPool *pool, CThreadWorker *worker); 15 static void InitWorker(CThreadWorker *worker); 16 static int WorkerIdleTimeout(CThreadPool *pool); 17 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg); 18 static void WakeupWorkerThread(CThreadWorker *worker); 19 20 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg); 21 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max); 22 23 CThreadPool *CreateThreadPool(unsigned int min, unsigned int max) 24 { 25 CThreadPool *poo; 26 27 pool = (CThreadPool *)malloc(sizeof(CThreadPool)); 28 29 if (pool == NULL) { 30 return NULL; 31 } 32 33 pool->head_ = NULL; 34 pool->min_ = min; 35 pool->cur_ = 0; 36 pool->max_ = max; 37 pool->last_empty_ = time(NULL); 38 pthread_mutex_init(&pool->mutex_, NULL); 39 40 int ret = 0; 41 while (min--) { 42 ret = CreateOneThread(pool); 43 if (ret != 0) { 44 exit(0); 45 } 46 } 47 return pool; 48 } 49 50 static int CreateOneThread(CThreadPool *pool) 51 { 52 pthread_t tid; 53 return pthread_create(&tid, NULL, DoProcess, pool); 54 } 55 56 static void *DoProcess(void *arg) 57 { 58 CThreadPool *pool = (CThreadPool *)arg; 59 60 CThreadWorker worker; 61 62 InitWorker(&worker); 63 64 pthread_mutex_lock(&pool->mutex_); 65 pool->cur_ += 1; 66 67 for (;;) { 68 PushWork(pool, &worker); 69 worker.sleep_when_ = time(NULL); 70 pthread_mutex_unlock(&pool->mutex_); 71 72 pthread_mutex_lock(&worker.mutex_); 73 while (worker.wake_up_ != 1) { 74 pthread_cond_wait(&worker.wake_, &worker.mutex_); 75 } 76 // worker线程已被唤醒,准备开始执行任务,修改wake_up_标志。 77 worker.wake_up_ = 0; 78 pthread_mutex_unlock(&worker.mutex_); 79 80 // 执行我们的任务,执行完毕之后,修改worker.func为NULL。 81 worker.func(arg); 82 worker.func = NULL; 83 84 // 任务执行完以后,线程池需要根据当前的线程池状态来判断是要把该线程重新加入线程池,还是要创建一个新的线程。 85 pthread_mutex_lock(&pool->mutex_); 86 if (WaitWorkerTimeout(pool) && NoThreadInPool(pool) && CanCreateThread(pool)) { 87 // 在我们执行这个任务的时候,其他任务等待空闲线程的时间超过了1秒,而且线程池中没有线程,且线程池当前线程数没有超过最大允许创建线程数 88 CreateOneThread(pool); 89 } 90 91 // 线程池中没有线程了,重新把该线程加入线程池 92 if (NoThreadInPool(pool)) { 93 continue; 94 } 95 96 // 线程池中线程数低于最低阈值,重新把该线程加入线程池 97 if (pool->curr <= pool->min) { 98 continue; 99 } 100 101 // 线程中睡眠的线程时间超过了1秒,说明线程池不是很繁忙,不需要把该线程重新加回线程池 102 if (WorkerIdleTimeout(pool)) { 103 break; 104 } 105 } 106 107 pool->cur -= 1; 108 pthread_mutex_unlock(&pool->mutex); 109 110 pthread_cond_destroy(&worker.wake_); 111 pthread_mutex_destroy(&worker.mutex_); 112 113 pthread_exit(NULL); 114 } 115 116 static void InitWorker(CThreadWorker *worker) 117 { 118 worker->next_ = NULL; 119 worker->wake_up_ = 0; 120 pthread_cond_init(&worker->wake_, NULL); 121 pthread_mutex_init(&worker->mutex_, NULL); 122 worker->tid_ = pthread_self(); 123 worker->func = NULL; 124 worker->arg = NULL; 125 worker->sleep_when_ = 0; 126 } 127 128 static void PushWork(CThreadPool *pool, CThreadWorker *worker) 129 { 130 worker->next_ = pool->head_; 131 pool->next_ = worker; 132 } 133 134 static int WorkerIdleTimeout(CThreadPool *pool) 135 { 136 CThreadWorker *worker; 137 138 if (NoThreadInPool(pool)) { 139 return 0; 140 } 141 worker = pool->head_; 142 return (time(NULL) > worker->sleep_when_ + 1)? 1 : 0; 143 } 144 145 int StartWork(CThreadPool *pool, void (*func)(void *arg), void *arg) 146 { 147 if (func == NULL) { 148 return -1; 149 } 150 151 CThreadWorker *worker; 152 pthread_mutex_lock(&pool->mutex_); 153 worker = GetWorker(pool, func, arg); 154 pthread_mutex_unlock(&pool->mutex_); 155 156 if (worker == NULL) { 157 return -2; 158 } 159 160 WakeupWorkerThread(worker); 161 return 0; 162 } 163 164 static CThreadWorker *GetWorker(CThreadPool *pool, void (*func)(void *arg), void *arg) 165 { 166 CThreadWorker *worker; 167 168 if (NoThreadInPool(pool)) { 169 return NULL; 170 } 171 172 worker = pool->head_; 173 PopWork(pool, worker); 174 175 if (NoThreadInPool(pool)) { 176 pool->last_empty_ = time(NULL); 177 } 178 179 worker->func = func; 180 worker->arg = arg; 181 182 return worker; 183 } 184 185 static void PopWork(CThreadPool *pool, CThreadWorker *worker) 186 { 187 pool->head_ = worker->next_; 188 worker->next_ = NULL; 189 } 190 191 static void WakeupWorkerThread(CThreadWorker *worker) 192 { 193 pthread_mutex_lock(&worker->mutex_); 194 worker->wake_up_ = 1; 195 pthread_mutex_unlock(&worker->mutex_); 196 197 pthread_cond_signal(&worker->wake_); 198 }
3 总结
该线程池模型实现了我们线上环境实际的一些应用场景,没有考虑的问题有这么几点:
1. 没有考虑任务类的概念,写一个任务基类,然后具体的任务类继承这个基类,实现自己的功能,具体执行时候,只需要add_task,把任务加进线程池即可,这样做的目的是想着可以给每个任务发信号,是否要终止任务的执行。当前的线程池做不到这点,也做不到判断任务执行时间,是否超时。
2. 每个线程只执行一个任务,而不是搞一个任务队列,去执行任务队列里的任务。关于这点,我也不清楚,搞任务队列的优势是什么。
个人能想到的就是这些,不知道诸位平时工作中,具体应用线程池的时候需要考虑一些什么场景,我这里想到的就是1、可以灵活的终止任务;2、可以判断任务是否超时;3、可以对需要执行的任务做到负载均衡(觉得这一点,在这里的线程池里不是问题,极端的情况是来了一堆任务,线程池中线程不够了,那确实会有这个问题)。
关于这几个问题,第3个我个人能解决,可以把上面的线程池稍微修改一下,每个线程结果ThreadWorker里面再加个任务队列,当没有任务时候线程阻塞,否则就取出任务来执行。
关于第一点,怎么修改这个框架,达到可以给任务发信号呢。
还有第二点,判断任务是否超时呢?
欢迎大家一起讨论!