Linux 多线程编程之 线程池 的原理和一个简单的C实现,提高对多线程编
程的认知,同步处理等操作,以及如何在实际项目中高效的利用多线程开
发。
1. 线程池介绍
为什么需要线程池???
目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务
器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,
但处理时间却相对较短。
传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创
建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就
是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时
间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执
行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,
这笔开销将是不可忽略的。
线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对
多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,
因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的
延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过
适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,
就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从
而可以防止资源不足。
2. 线程池结构
2.1 线程池任务结点结构
线程池任务结点用来保存用户投递过来的的任务,并放入线程池中的线程来执行,任务结构
1 1 // 线程池任务结点 2 2 struct worker_t { 3 3 void * (* process)(void * arg); /*回调函数*/ 4 4 int paratype; /*函数类型(预留)*/ 5 5 void * arg; /*回调函数参数*/ 6 6 struct worker_t * next; /*链接下一个任务节点*/ 7 7 }; 8 9 2.2 线程池控制器 10 11 线程池控制器用来对线程池进行控制管理,描述当前线程池的最基本信息,包括任务的投递,线 12 13 程池状态的更新与查询,线程池的销毁等,其结构如下: 14 15 /*线程控制器*/ 16 struct CThread_pool_t { 17 pthread_mutex_t queue_lock; /*互斥锁*/ 18 pthread_cond_t queue_ready; /*条件变量*/ 19 20 worker_t * queue_head; /*任务节点链表 保存所有投递的任务*/ 21 int shutdown; /*线程池销毁标志 1-销毁*/ 22 pthread_t * threadid; /*线程ID*/ 23 24 int max_thread_num; /*线程池可容纳最大线程数*/ 25 int current_pthread_num; /*当前线程池存放的线程*/ 26 int current_pthread_task_num; /*当前已经执行任务和已分配任务的线程数目和*/ 27 int current_wait_queue_num; /*当前等待队列的的任务数目*/ 28 int free_pthread_num; /*线程池允许最大的空闲线程数/*/ 29 30 /** 31 * function: ThreadPoolAddWorkUnlimit 32 * description: 向线程池投递任务 33 * input param: pthis 线程池指针 34 * process 回调函数 35 * arg 回调函数参数 36 * return Valr: 0 成功 37 * -1 失败 38 */ 39 int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg); 40 41 /** 42 * function: ThreadPoolAddWorkLimit 43 * description: 向线程池投递任务,无空闲线程则阻塞 44 * input param: pthis 线程池指针 45 * process 回调函数 46 * arg 回调函数参数 47 * return Val: 0 成功 48 * -1 失败 49 */ 50 int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg); 51 52 /** 53 * function: ThreadPoolGetThreadMaxNum 54 * description: 获取线程池可容纳的最大线程数 55 * input param: pthis 线程池指针 56 */ 57 int (* GetThreadMaxNum)(void * pthis); 58 59 /** 60 * function: ThreadPoolGetCurrentThreadNum 61 * description: 获取线程池存放的线程数 62 * input param: pthis 线程池指针 63 * return Val: 线程池存放的线程数 64 */ 65 int (* GetCurrentThreadNum)(void * pthis); 66 67 /** 68 * function: ThreadPoolGetCurrentTaskThreadNum 69 * description: 获取当前正在执行任务和已经分配任务的线程数目和 70 * input param: pthis 线程池指针 71 * return Val: 当前正在执行任务和已经分配任务的线程数目和 72 */ 73 int (* GetCurrentTaskThreadNum)(void * pthis); 74 75 /** 76 * function: ThreadPoolGetCurrentWaitTaskNum 77 * description: 获取线程池等待队列任务数 78 * input param: pthis 线程池指针 79 * return Val: 等待队列任务数 80 */ 81 int (* GetCurrentWaitTaskNum)(void * pthis); 82 83 /** 84 * function: ThreadPoolDestroy 85 * description: 销毁线程池 86 * input param: pthis 线程池指针 87 * return Val: 0 成功 88 * -1 失败 89 */ 90 int (* Destroy)(void * pthis); 91 };
2.3 线程池运行结构
解释:
1) 图中的线程池中的"空闲"和"执行"分别表示空闲线程和执行线程,空闲线程指在正在等待任务的线程,
同样执行线程指正在执行任务的线程, 两者是相互转换的。当用户投递任务过来则用空闲线程来执行
该任务,且空闲线程状态转换为执行线程;当任务执行完后,执行线程状态转变为空闲线程。
2) 创建线程池时,正常情况会创建一定数量的线程, 所有线程初始化为空闲线程,线程阻塞等待用户
投递任务。
3) 用户投递的任务首先放入等待队列queue_head 链表中, 如果线程池中有空闲线程则放入空闲线程中
执行,否则根据条件选择继续等待空闲线程或者新建一个线程来执行,新建的线程将放入线程池中。
4) 执行的任务会从等待队列中脱离,并在任务执行完后释放任务结点worker_t
3. 线程池控制 / 部分函数解释
3.1 线程池创建
创建 max_num 个线程 ThreadPoolRoutine,即空闲线程
1 /** 2 * function: ThreadPoolConstruct 3 * description: 构建线程池 4 * input param: max_num 线程池可容纳的最大线程数 5 * free_num 线程池允许存在的最大空闲线程,超过则将线程释放回操作系统 6 * return Val: 线程池指针 7 */ 8 CThread_pool_t * 9 ThreadPoolConstruct(int max_num, int free_num) 10 { 11 int i = 0; 12 13 CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t)); 14 if(NULL == pool) 15 return NULL; 16 17 memset(pool, 0, sizeof(CThread_pool_t)); 18 19 /*初始化互斥锁*/ 20 pthread_mutex_init(&(pool->queue_lock), NULL); 21 /*初始化条件变量*/ 22 pthread_cond_init(&(pool->queue_ready), NULL); 23 24 pool->queue_head = NULL; 25 pool->max_thread_num = max_num; // 线程池可容纳的最大线程数 26 pool->current_wait_queue_num = 0; 27 pool->current_pthread_task_num = 0; 28 pool->shutdown = 0; 29 pool->current_pthread_num = 0; 30 pool->free_pthread_num = free_num; // 线程池允许存在最大空闲线程 31 pool->threadid = NULL; 32 pool->threadid = (pthread_t *)malloc(max_num*sizeof(pthread_t)); 33 /*该函数指针赋值*/ 34 pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; 35 pool->AddWorkLimit = ThreadPoolAddWorkLimit; 36 pool->Destroy = ThreadPoolDestroy; 37 pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum; 38 pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum; 39 pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; 40 pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum; 41 42 for(i=0; i<max_num; i++) { 43 pool->current_pthread_num++; // 当前池中的线程数 44 /*创建线程*/ 45 pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool); 46 usleep(1000); 47 } 48 49 return pool; 50 }
3.2 投递任务
1 /** 2 * function: ThreadPoolAddWorkLimit 3 * description: 向线程池投递任务,无空闲线程则阻塞 4 * input param: pthis 线程池指针 5 * process 回调函数 6 * arg 回调函数参数 7 * return Val: 0 成功 8 * -1 失败 9 */ 10 int 11 ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg) 12 { 13 // int FreeThreadNum = 0; 14 // int CurrentPthreadNum = 0; 15 16 CThread_pool_t * pool = (CThread_pool_t *)pthis; 17 18 /*为添加的任务队列节点分配内存*/ 19 worker_t * newworker = (worker_t *)malloc(sizeof(worker_t)); 20 if(NULL == newworker) 21 return -1; 22 23 newworker->process = process; // 回调函数,在线程ThreadPoolRoutine()中执行 24 newworker->arg = arg; // 回调函数参数 25 newworker->next = NULL; 26 27 pthread_mutex_lock(&(pool->queue_lock)); 28 29 /*插入新任务队列节点*/ 30 worker_t * member = pool->queue_head; // 指向任务队列链表整体 31 if(member != NULL) { 32 while(member->next != NULL) // 队列中有节点 33 member = member->next; // member指针往后移动 34 35 member->next = newworker; // 插入到队列链表尾部 36 } else 37 pool->queue_head = newworker; // 插入到队列链表头 38 39 assert(pool->queue_head != NULL); 40 pool->current_wait_queue_num++; // 等待队列加1 41 42 /*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/ 43 int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; 44 /*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/ 45 if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) { //-> 条件为真进行新线程创建 46 int CurrentPthreadNum = pool->current_pthread_num; 47 48 /*新增线程*/ 49 pool->threadid = (pthread_t *)realloc(pool->threadid, 50 (CurrentPthreadNum+1) * sizeof(pthread_t)); 51 52 pthread_create(&(pool->threadid[CurrentPthreadNum]), 53 NULL, ThreadPoolRoutine, (void *)pool); 54 /*当前线程池中线程总数加1*/ 55 pool->current_pthread_num++; 56 57 /*分配任务线程数加1*/ 58 pool->current_pthread_task_num++; 59 pthread_mutex_unlock(&(pool->queue_lock)); 60 61 /*发送信号给一个处与条件阻塞等待状态的线程*/ 62 pthread_cond_signal(&(pool->queue_ready)); 63 return 0; 64 } 65 66 pool->current_pthread_task_num++; 67 pthread_mutex_unlock(&(pool->queue_lock)); 68 69 /*发送信号给一个处与条件阻塞等待状态的线程*/ 70 pthread_cond_signal(&(pool->queue_ready)); 71 // usleep(10); //看情况 72 return 0; 73 }
投递任务时先创建一个任务结点保存回调函数和函数参数,并将任务结点放入等待队列中,在代码中
注释"//->条件为真创建新线程",realloc() 会在保存原始内存中的数据不变的基础上新增1个sizeof(pthread_t)
大小内存。之后更新current_pthread_num,和current_pthread_task_num;并发送信号
pthread_cond_signal(&(pool->queue_read)),给一个处于条件阻塞等待状态的线程,即线程ThreadPoolRoutin()
中的pthread_cond_wait(&(pool->queue_read), &(pool->queue_lock))阻塞等待接收信号,重点讲互
斥锁和添加变量:
pthread_mutex_t queue_lock; /**< 互斥锁*/
pthread_cond_t queue_ready; /**< 条件变量*/
这两个变量时线程池实现中很重要的点,这里简要介绍代码中会用到的相关函数功能;
3.3 执行线程
1 /** 2 * function: ThreadPoolRoutine 3 * description: 线程池中执行的线程 4 * input param: arg 线程池指针 5 */ 6 void * 7 ThreadPoolRoutine(void * arg) 8 { 9 CThread_pool_t * pool = (CThread_pool_t *)arg; 10 11 while(1) { 12 /*上锁,pthread_cond_wait()调用会解锁*/ 13 pthread_mutex_lock(&(pool->queue_lock)); 14 15 /*队列没有等待任务*/ 16 while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) { 17 /*条件锁阻塞等待条件信号*/ 18 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); 19 } 20 21 if(pool->shutdown) { 22 pthread_mutex_unlock(&(pool->queue_lock)); 23 pthread_exit(NULL); // 释放线程 24 } 25 26 assert(pool->current_wait_queue_num != 0); 27 assert(pool->queue_head != NULL); 28 29 pool->current_wait_queue_num--; // 等待任务减1,准备执行任务 30 worker_t * worker = pool->queue_head; // 去等待队列任务节点头 31 pool->queue_head = worker->next; // 链表后移 32 pthread_mutex_unlock(&(pool->queue_lock)); 33 34 (* (worker->process))(worker->arg); // 执行回调函数 35 36 pthread_mutex_lock(&(pool->queue_lock)); 37 pool->current_pthread_task_num--; // 函数执行结束 38 free(worker); // 释放任务结点 39 worker = NULL; 40 41 if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { 42 pthread_mutex_unlock(&(pool->queue_lock)); 43 break; // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统 44 } 45 pthread_mutex_unlock(&(pool->queue_lock)); 46 } 47 48 pool->current_pthread_num--; // 当前线程数减1 49 pthread_exit(NULL); // 释放线程 50 51 return (void *)NULL; 52 }
这个就是用来执行任务的线程,在初始化创建线程时所有线程都全部阻塞在pthread_cond_wait()处
此时的线程就为空闲线程,也就是线程被挂起,当收到信号并取得互斥锁时, 表明任务投递过来
则获取等待队列里的任务结点并执行回调函数; 函数执行结束后回去判断当前等待队列是否还有任
务,有则接下去执行,否则重新阻塞回到空闲线程状态。
4. 完整代码实现
4.1 CThreadPool.h 文件
1 /** 2 * 线程池头文件 3 * 4 **/ 5 6 #ifndef _CTHREADPOOL_H_ 7 #define _CTHREADPOOL_H_ 8 9 #include <pthread.h> 10 11 /*线程池可容纳最大线程数*/ 12 #define DEFAULT_MAX_THREAD_NUM 100 13 14 /*线程池允许最大的空闲线程,超过则将线程释放回操作系统*/ 15 #define DEFAULT_FREE_THREAD_NUM 10 16 17 typedef struct worker_t worker_t; 18 typedef struct CThread_pool_t CThread_pool_t; 19 20 /*线程池任务节点*/ 21 struct worker_t { 22 void * (* process)(void * arg); /*回调函数*/ 23 int paratype; /*函数类型(预留)*/ 24 void * arg; /*回调函数参数*/ 25 struct worker_t * next; /*链接下一个任务节点*/ 26 }; 27 28 /*线程控制器*/ 29 struct CThread_pool_t { 30 pthread_mutex_t queue_lock; /*互斥锁*/ 31 pthread_cond_t queue_ready; /*条件变量*/ 32 33 worker_t * queue_head; /*任务节点链表 保存所有投递的任务*/ 34 int shutdown; /*线程池销毁标志 1-销毁*/ 35 pthread_t * threadid; /*线程ID*/ 36 37 int max_thread_num; /*线程池可容纳最大线程数*/ 38 int current_pthread_num; /*当前线程池存放的线程*/ 39 int current_pthread_task_num; /*当前已经执行任务和已分配任务的线程数目和*/ 40 int current_wait_queue_num; /*当前等待队列的的任务数目*/ 41 int free_pthread_num; /*线程池允许最大的空闲线程数/*/ 42 43 /** 44 * function: ThreadPoolAddWorkUnlimit 45 * description: 向线程池投递任务 46 * input param: pthis 线程池指针 47 * process 回调函数 48 * arg 回调函数参数 49 * return Valr: 0 成功 50 * -1 失败 51 */ 52 int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg); 53 54 /** 55 * function: ThreadPoolAddWorkLimit 56 * description: 向线程池投递任务,无空闲线程则阻塞 57 * input param: pthis 线程池指针 58 * process 回调函数 59 * arg 回调函数参数 60 * return Val: 0 成功 61 * -1 失败 62 */ 63 int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg); 64 65 /** 66 * function: ThreadPoolGetThreadMaxNum 67 * description: 获取线程池可容纳的最大线程数 68 * input param: pthis 线程池指针 69 */ 70 int (* GetThreadMaxNum)(void * pthis); 71 72 /** 73 * function: ThreadPoolGetCurrentThreadNum 74 * description: 获取线程池存放的线程数 75 * input param: pthis 线程池指针 76 * return Val: 线程池存放的线程数 77 */ 78 int (* GetCurrentThreadNum)(void * pthis); 79 80 /** 81 * function: ThreadPoolGetCurrentTaskThreadNum 82 * description: 获取当前正在执行任务和已经分配任务的线程数目和 83 * input param: pthis 线程池指针 84 * return Val: 当前正在执行任务和已经分配任务的线程数目和 85 */ 86 int (* GetCurrentTaskThreadNum)(void * pthis); 87 88 /** 89 * function: ThreadPoolGetCurrentWaitTaskNum 90 * description: 获取线程池等待队列任务数 91 * input param: pthis 线程池指针 92 * return Val: 等待队列任务数 93 */ 94 int (* GetCurrentWaitTaskNum)(void * pthis); 95 96 /** 97 * function: ThreadPoolDestroy 98 * description: 销毁线程池 99 * input param: pthis 线程池指针 100 * return Val: 0 成功 101 * -1 失败 102 */ 103 int (* Destroy)(void * pthis); 104 }; 105 106 /** 107 * function: ThreadPoolConstruct 108 * description: 构建线程池 109 * input param: max_num 线程池可容纳的最大线程数 110 * free_num 线程池允许存在的最大空闲线程,超过则将线程释放回操作系统 111 * return Val: 线程池指针 112 */ 113 CThread_pool_t * ThreadPoolConstruct(int max_num, int free_num); 114 115 /** 116 * function: ThreadPoolConstructDefault 117 * description: 创建线程池,以默认的方式初始化,未创建线程 118 * 119 * return Val: 线程池指针 120 */ 121 CThread_pool_t * ThreadPoolConstructDefault(void); 122 123 #endif // _CTHREADPOOL_H_
4.2 CThreadPool.c 文件
1 /** 2 * 线程池实现 3 * 4 **/ 5 6 #include <stdio.h> 7 #include <stdlib.h> 8 #include <string.h> 9 #include <unistd.h> 10 #include <sys/types.h> 11 #include <pthread.h> 12 #include <assert.h> 13 14 #include "CThreadPool.h" 15 16 void * ThreadPoolRoutine(void * arg); 17 18 /** 19 * function: ThreadPoolAddWorkLimit 20 * description: 向线程池投递任务,无空闲线程则阻塞 21 * input param: pthis 线程池指针 22 * process 回调函数 23 * arg 回调函数参数 24 * return Val: 0 成功 25 * -1 失败 26 */ 27 int 28 ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg) 29 { 30 // int FreeThreadNum = 0; 31 // int CurrentPthreadNum = 0; 32 33 CThread_pool_t * pool = (CThread_pool_t *)pthis; 34 35 /*为添加的任务队列节点分配内存*/ 36 worker_t * newworker = (worker_t *)malloc(sizeof(worker_t)); 37 if(NULL == newworker) 38 return -1; 39 40 newworker->process = process; // 回调函数,在线程ThreadPoolRoutine()中执行 41 newworker->arg = arg; // 回调函数参数 42 newworker->next = NULL; 43 44 pthread_mutex_lock(&(pool->queue_lock)); 45 46 /*插入新任务队列节点*/ 47 worker_t * member = pool->queue_head; // 指向任务队列链表整体 48 if(member != NULL) { 49 while(member->next != NULL) // 队列中有节点 50 member = member->next; // member指针往后移动 51 52 member->next = newworker; // 插入到队列链表尾部 53 } else 54 pool->queue_head = newworker; // 插入到队列链表头 55 56 assert(pool->queue_head != NULL); 57 pool->current_wait_queue_num++; // 等待队列加1 58 59 /*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/ 60 int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; 61 /*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/ 62 if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) { 63 int CurrentPthreadNum = pool->current_pthread_num; 64 65 /*新增线程*/ 66 pool->threadid = (pthread_t *)realloc(pool->threadid, 67 (CurrentPthreadNum+1) * sizeof(pthread_t)); 68 69 pthread_create(&(pool->threadid[CurrentPthreadNum]), 70 NULL, ThreadPoolRoutine, (void *)pool); 71 /*当前线程池中线程总数加1*/ 72 pool->current_pthread_num++; 73 74 /*分配任务线程数加1*/ 75 pool->current_pthread_task_num++; 76 pthread_mutex_unlock(&(pool->queue_lock)); 77 78 /*发送信号给一个处与条件阻塞等待状态的线程*/ 79 pthread_cond_signal(&(pool->queue_ready)); 80 return 0; 81 } 82 83 pool->current_pthread_task_num++; 84 pthread_mutex_unlock(&(pool->queue_lock)); 85 86 /*发送信号给一个处与条件阻塞等待状态的线程*/ 87 pthread_cond_signal(&(pool->queue_ready)); 88 // usleep(10); //看情况 89 return 0; 90 } 91 92 /** 93 * function: ThreadPoolAddWorkUnlimit 94 * description: 向线程池投递任务 95 * input param: pthis 线程池指针 96 * process 回调函数 97 * arg 回调函数参数 98 * return Valr: 0 成功 99 * -1 失败 100 */ 101 int 102 ThreadPoolAddWorkUnlimit(void * pthis, void * (* process)(void * arg), void * arg) 103 { 104 // int FreeThreadNum = 0; 105 // int CurrentPthreadNum = 0; 106 107 CThread_pool_t * pool = (CThread_pool_t *)pthis; 108 109 /*给新任务队列节点分配内存*/ 110 worker_t * newworker = (worker_t *)malloc(sizeof(worker_t)); 111 if(NULL == newworker) 112 return -1; 113 114 newworker->process = process; // 回调函数 115 newworker->arg = arg; // 回调函数参数 116 newworker->next = NULL; 117 118 pthread_mutex_lock(&(pool->queue_lock)); 119 120 /*新节点插入任务队列链表操作*/ 121 worker_t * member = pool->queue_head; 122 if(member != NULL) { 123 while(member->next != NULL) 124 member = member->next; 125 126 member->next = newworker; // 插入队列链表尾部 127 } else 128 pool->queue_head = newworker; // 插入到头(也就是第一个节点,之前链表没有节点) 129 130 assert(pool->queue_head != NULL); 131 pool->current_wait_queue_num++; // 当前等待队列的的任务数目+1 132 133 int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; 134 /*只判断是否没有空闲线程*/ 135 if(0 == FreeThreadNum) { 136 int CurrentPthreadNum = pool->current_pthread_num; 137 pool->threadid = (pthread_t *)realloc(pool->threadid, 138 (CurrentPthreadNum+1)*sizeof(pthread_t)); 139 pthread_create(&(pool->threadid[CurrentPthreadNum]),NULL, 140 ThreadPoolRoutine, (void *)pool); 141 pool->current_pthread_num++; 142 if(pool->current_pthread_num > pool->max_thread_num) 143 pool->max_thread_num = pool->current_pthread_num; 144 145 pool->current_pthread_task_num++; 146 pthread_mutex_unlock(&(pool->queue_lock)); 147 pthread_cond_signal(&(pool->queue_ready)); 148 return 0; 149 } 150 151 pool->current_pthread_task_num++; 152 pthread_mutex_unlock(&(pool->queue_lock)); 153 pthread_cond_signal(&(pool->queue_ready)); 154 // usleep(10); 155 return 0; 156 } 157 158 /** 159 * function: ThreadPoolGetThreadMaxNum 160 * description: 获取线程池可容纳的最大线程数 161 * input param: pthis 线程池指针 162 * return val: 线程池可容纳的最大线程数 163 */ 164 int 165 ThreadPoolGetThreadMaxNum(void * pthis) 166 { 167 int num = 0; 168 CThread_pool_t * pool = (CThread_pool_t *)pthis; 169 170 pthread_mutex_lock(&(pool->queue_lock)); 171 num = pool->max_thread_num; 172 pthread_mutex_unlock(&(pool->queue_lock)); 173 174 return num; 175 } 176 177 /** 178 * function: ThreadPoolGetCurrentThreadNum 179 * description: 获取线程池存放的线程数 180 * input param: pthis 线程池指针 181 * return Val: 线程池存放的线程数 182 */ 183 int 184 ThreadPoolGetCurrentThreadNum(void * pthis) 185 { 186 int num = 0; 187 CThread_pool_t * pool = (CThread_pool_t *)pthis; 188 189 pthread_mutex_lock(&(pool->queue_lock)); 190 num = pool->current_pthread_num; 191 pthread_mutex_unlock(&(pool->queue_lock)); 192 193 return num; 194 } 195 196 /** 197 * function: ThreadPoolGetCurrentTaskThreadNum 198 * description: 获取当前正在执行任务和已经分配任务的线程数目和 199 * input param: pthis 线程池指针 200 * return Val: 当前正在执行任务和已经分配任务的线程数目和 201 */ 202 int 203 ThreadPoolGetCurrentTaskThreadNum(void * pthis) 204 { 205 int num = 0; 206 CThread_pool_t * pool = (CThread_pool_t *)pthis; 207 208 pthread_mutex_lock(&(pool->queue_lock)); 209 num = pool->current_pthread_task_num; 210 pthread_mutex_unlock(&(pool->queue_lock)); 211 212 return num; 213 } 214 215 /** 216 * function: ThreadPoolGetCurrentWaitTaskNum 217 * description: 获取线程池等待队列任务数 218 * input param: pthis 线程池指针 219 * return Val: 等待队列任务数 220 */ 221 int 222 ThreadPoolGetCurrentWaitTaskNum(void * pthis) 223 { 224 int num = 0; 225 CThread_pool_t * pool = (CThread_pool_t *)pthis; 226 227 pthread_mutex_lock(&(pool->queue_lock)); 228 num = pool->current_wait_queue_num; 229 pthread_mutex_unlock(&(pool->queue_lock)); 230 231 return num; 232 } 233 234 /** 235 * function: ThreadPoolDestroy 236 * description: 销毁线程池 237 * input param: pthis 线程池指针 238 * return Val: 0 成功 239 * -1 失败 240 */ 241 int 242 ThreadPoolDestroy(void * pthis) 243 { 244 int i; 245 CThread_pool_t * pool = (CThread_pool_t *)pthis; 246 247 if(pool->shutdown) // 已销毁 248 return -1; 249 250 pool->shutdown = 1; // 销毁标志置位 251 252 /*唤醒所有pthread_cond_wait()等待线程*/ 253 pthread_cond_broadcast(&(pool->queue_ready)); 254 for(i=0; i<pool->current_pthread_num; i++) 255 pthread_join(pool->threadid[i], NULL); // 等待所有线程执行结束 256 257 free(pool->threadid); // 释放 258 259 /*销毁任务队列链表*/ 260 worker_t * head = NULL; 261 while(pool->queue_head != NULL) { 262 head = pool->queue_head; 263 pool->queue_head = pool->queue_head->next; 264 free(head); 265 } 266 267 /*销毁锁*/ 268 pthread_mutex_destroy(&(pool->queue_lock)); 269 pthread_cond_destroy(&(pool->queue_ready)); 270 271 free(pool); 272 pool = NULL; 273 274 return 0; 275 } 276 277 /** 278 * function: ThreadPoolRoutine 279 * description: 线程池中运行的线程 280 * input param: arg 线程池指针 281 */ 282 void * 283 ThreadPoolRoutine(void * arg) 284 { 285 CThread_pool_t * pool = (CThread_pool_t *)arg; 286 287 while(1) { 288 /*上锁,pthread_cond_wait()调用会解锁*/ 289 pthread_mutex_lock(&(pool->queue_lock)); 290 291 /*队列没有等待任务*/ 292 while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) { 293 /*条件锁阻塞等待条件信号*/ 294 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); 295 } 296 297 if(pool->shutdown) { 298 pthread_mutex_unlock(&(pool->queue_lock)); 299 pthread_exit(NULL); // 释放线程 300 } 301 302 assert(pool->current_wait_queue_num != 0); 303 assert(pool->queue_head != NULL); 304 305 pool->current_wait_queue_num--; // 等待任务减1,准备执行任务 306 worker_t * worker = pool->queue_head; // 去等待队列任务节点头 307 pool->queue_head = worker->next; // 链表后移 308 pthread_mutex_unlock(&(pool->queue_lock)); 309 310 (* (worker->process))(worker->arg); // 执行回调函数 311 312 pthread_mutex_lock(&(pool->queue_lock)); 313 pool->current_pthread_task_num--; // 函数执行结束 314 free(worker); // 释放任务结点 315 worker = NULL; 316 317 if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { 318 pthread_mutex_unlock(&(pool->queue_lock)); 319 break; // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统 320 } 321 pthread_mutex_unlock(&(pool->queue_lock)); 322 } 323 324 pool->current_pthread_num--; // 当前线程数减1 325 pthread_exit(NULL); // 释放线程 326 327 return (void *)NULL; 328 } 329 330 /** 331 * function: ThreadPoolConstruct 332 * description: 构建线程池 333 * input param: max_num 线程池可容纳的最大线程数 334 * free_num 线程池允许存在的最大空闲线程,超过则将线程释放回操作系统 335 * return Val: 线程池指针 336 */ 337 CThread_pool_t * 338 ThreadPoolConstruct(int max_num, int free_num) 339 { 340 int i = 0; 341 342 CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t)); 343 if(NULL == pool) 344 return NULL; 345 346 memset(pool, 0, sizeof(CThread_pool_t)); 347 348 /*初始化互斥锁*/ 349 pthread_mutex_init(&(pool->queue_lock), NULL); 350 /*初始化条件变量*/ 351 pthread_cond_init(&(pool->queue_ready), NULL); 352 353 pool->queue_head = NULL; 354 pool->max_thread_num = max_num; // 线程池可容纳的最大线程数 355 pool->current_wait_queue_num = 0; 356 pool->current_pthread_task_num = 0; 357 pool->shutdown = 0; 358 pool->current_pthread_num = 0; 359 pool->free_pthread_num = free_num; // 线程池允许存在最大空闲线程 360 pool->threadid = NULL; 361 pool->threadid = (pthread_t *)malloc(max_num*sizeof(pthread_t)); 362 /*该函数指针赋值*/ 363 pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; 364 pool->AddWorkLimit = ThreadPoolAddWorkLimit; 365 pool->Destroy = ThreadPoolDestroy; 366 pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum; 367 pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum; 368 pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; 369 pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum; 370 371 for(i=0; i<max_num; i++) { 372 pool->current_pthread_num++; // 当前池中的线程数 373 /*创建线程*/ 374 pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool); 375 usleep(1000); 376 } 377 378 return pool; 379 } 380 381 /** 382 * function: ThreadPoolConstructDefault 383 * description: 创建线程池,以默认的方式初始化,未创建线程 384 * 385 * return Val: 线程池指针 386 */ 387 CThread_pool_t * 388 ThreadPoolConstructDefault(void) 389 { 390 CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t)); 391 if(NULL == pool) 392 return NULL; 393 394 memset(pool, 0, sizeof(CThread_pool_t)); 395 396 pthread_mutex_init(&(pool->queue_lock), NULL); 397 pthread_cond_init(&(pool->queue_ready), NULL); 398 399 pool->queue_head = NULL; 400 pool->max_thread_num = DEFAULT_MAX_THREAD_NUM; // 默认值 401 pool->current_wait_queue_num = 0; 402 pool->current_pthread_task_num = 0; 403 pool->shutdown = 0; 404 pool->current_pthread_num = 0; 405 pool->free_pthread_num = DEFAULT_FREE_THREAD_NUM; // 默认值 406 pool->threadid = NULL; 407 /*该函数指针赋值*/ 408 pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; 409 pool->AddWorkLimit = ThreadPoolAddWorkLimit; 410 pool->Destroy = ThreadPoolDestroy; 411 pool->GetThreadMaxNum = ThreadPoolGetThreadMaxNum; 412 pool->GetCurrentThreadNum = ThreadPoolGetCurrentThreadNum; 413 pool->GetCurrentTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; 414 pool->GetCurrentWaitTaskNum = ThreadPoolGetCurrentWaitTaskNum; 415 416 return pool; 417 }
4.3 测试 main.c 文件
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <sys/types.h> 5 #include <pthread.h> 6 #include <assert.h> 7 #include <string.h> 8 9 #include "CThreadPool.h" 10 11 12 void * thread_1(void * arg); 13 void * thread_2(void * arg); 14 void * thread_3(void * arg); 15 void DisplayPoolStatus(CThread_pool_t * pPool); 16 17 int nKillThread = 0; 18 19 int main() 20 { 21 CThread_pool_t * pThreadPool = NULL; 22 23 pThreadPool = ThreadPoolConstruct(5, 1); 24 int nNumInput = 5; 25 char LogInput[] = "OK!"; 26 27 DisplayPoolStatus(pThreadPool); 28 /*可用AddWorkLimit()替换看执行的效果*/ 29 pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_1, (void *)NULL); 30 /* 31 * 没加延迟发现连续投递任务时pthread_cond_wait()会收不到信号pthread_cond_signal() !! 32 * 因为AddWorkUnlimit()进去后调用pthread_mutex_lock()把互斥锁锁上,导致pthread_cond_wait() 33 * 收不到信号!!也可在AddWorkUnlimit()里面加个延迟,一般情况可能也遇不到这个问题 34 */ 35 usleep(10); 36 pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_2, (void *)nNumInput); 37 usleep(10); 38 pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_3, (void *)LogInput); 39 usleep(10); 40 DisplayPoolStatus(pThreadPool); 41 42 nKillThread = 1; 43 usleep(100); /**< 先让线程退出 */ 44 DisplayPoolStatus(pThreadPool); 45 nKillThread = 2; 46 usleep(100); 47 DisplayPoolStatus(pThreadPool); 48 nKillThread = 3; 49 usleep(100); 50 DisplayPoolStatus(pThreadPool); 51 52 pThreadPool->Destroy((void*)pThreadPool); 53 return 0; 54 } 55 56 void * 57 thread_1(void * arg) 58 { 59 printf("Thread 1 is running ! "); 60 while(nKillThread != 1) 61 usleep(10); 62 return NULL; 63 } 64 65 void * 66 thread_2(void * arg) 67 { 68 int nNum = (int)arg; 69 70 printf("Thread 2 is running ! "); 71 printf("Get Number %d ", nNum); 72 while(nKillThread != 2) 73 usleep(10); 74 return NULL; 75 } 76 77 void * 78 thread_3(void * arg) 79 { 80 char * pLog = (char *)arg; 81 82 printf("Thread 3 is running ! "); 83 printf("Get String %s ", pLog); 84 while(nKillThread != 3) 85 usleep(10); 86 return NULL; 87 } 88 89 void 90 DisplayPoolStatus(CThread_pool_t * pPool) 91 { 92 static int nCount = 1; 93 94 printf("**************************** "); 95 printf("nCount = %d ", nCount++); 96 printf("max_thread_num = %d ", pPool->GetThreadMaxNum((void *)pPool)); 97 printf("current_pthread_num = %d ", pPool->GetCurrentThreadNum((void *)pPool)); 98 printf("current_pthread_task_num = %d ", pPool->GetCurrentTaskThreadNum((void *)pPool)); 99 printf("current_wait_queue_num = %d ", pPool->GetCurrentWaitTaskNum((void *)pPool)); 100 printf("**************************** "); 101 }
4.4 Makefile
简单写一个makefile
1 CC = gcc 2 CFLAGS = -g -Wall -o2 3 LIB = -lpthread 4 5 RUNE = $(CC) $(CFLAGS) $(object) -o $(exe) $(LIB) 6 RUNO = $(CC) $(CFLAGS) -c $< -o $@ $(LIB) 7 8 .RHONY:clean 9 10 11 object = main.o CThreadPool.o 12 exe = CThreadpool 13 14 $(exe):$(object) 15 $(RUNE) 16 17 %.o:%.c CThreadPool.h 18 $(RUNO) 19 %.o:%.c 20 $(RUNO) 21 22 23 clean: 24 -rm -rf *.o CThreadpool *~
注意:使用模式规则,能引入用户自定义变量,为多个文件建立相同的规则,规则中的相关
文件前必须用“%”表明。关于Makefile的一些规则解释见另一篇
转:https://www.cnblogs.com/zhaoosheLBJ/p/9337291.html