这里给个线程池的实现代码,里面带有个应用小例子,方便学习使用,代码 GCC 编译可用。参照代码看下面介绍的线程池原理跟容易接受,百度云下载链接:
http://pan.baidu.com/s/1i3zMHDV
一.线程池简介
为什么使用线程池?
目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即 时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,这笔开销将是不可忽略的。
线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。
二.线程池的结构
2.1. 线程池任务结点
线程池任务结点用来保存用户投递过来的任务,并放入线程池中的线程执行。其结构名为worker_t,定义如下:
1 /*线程池任务结点*/ 2 struct worker_t 3 { 4 void *(*process) (void *arg); /**< 回调函数 */ 5 int paratype; /**< 函数类型(预留) */ 6 void *arg; /**< 回调函数参数 */ 7 struct worker_t *next; /**< 连接下一个任务结点 */ 8 };
2.2. 线程池控制器
线程池控制器用来对线程池进行控制管理,包括任务的投递、线程池状态的更新与查询、线程池的销毁等。其结构名为CThread_pool_t,定义如下:
1 /*线程池控制器*/ 2 struct CThread_pool_t 3 { 4 pthread_mutex_t queue_lock; /**< 互斥锁 */ 5 pthread_cond_t queue_ready; /**< 条件变量 */ 6 7 worker_t *queue_head; /**< 任务结点链表,保存所有投递的任务 */ 8 int shutdown; /**< 线程池销毁标志,1 -> 销毁 */ 9 pthread_t *threadid; /**< 线程ID */ 10 int max_thread_num; /**< 线程池可容纳的最大线程数 */ 11 int current_pthread_num; /**< 当前线程池存放的线程数 */ 12 int current_pthread_task_num; /**< 当前正在执行任务和已分配任务的线程数目和 */ 13 int cur_queue_size; /**< 当前等待队列的任务数目 */ 14 int free_pthread_num; /**< 线程池内允许存在的最大空闲线程数 */ 15 16 /*向线程池投递任务*/ 17 int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 18 /*向线程池投递任务,无空闲线程则阻塞*/ 19 int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 20 /*获取线程池可容纳的最大线程数*/ 21 int (*GetMaxThreadNum) (void *pthis); 22 /*获取线程池存放的线程数*/ 23 int (*GetCurThreadNum) (void *pthis); 24 /*获取当前正在执行任务和已分配任务的线程数目和*/ 25 int (*GetCurTaskThreadNum) (void *pthis); 26 /*获取线程池等待队列任务数*/ 27 int (*GetCurTaskNum) (void *pthis); 28 /*销毁线程池*/ 29 int (*Destruct) (void *pthis); 30 };
2.3. 线程池结构
线程池的运行结构图如下:
根据上图罗列几个注意的地方:
(1)图中的线程池中的“空闲”和“执行”分别表示空闲线程和执行线程,空闲线程指在正在等待任务的线程,同样执行线程指正在执行任务的线程,两者是相互转换的。当用户投递任务过来则用空闲线程来执行该任务,且空闲线程状态转变为执行线程;当任务执行完后,执行线程状态转变为空闲线程;
(2)创建线程池时,正常情况会先创建一定数量的线程,所有线程初始为空闲线程,线程阻塞等待用户投递任务;
(3)用户投递的任务首先放入等待队列 queue_head 链表中,如果线程池中有空闲线程则放入空闲线程中执行,否则根据条件选择继续等待空闲线程或者新建一个线程来执行,新建的线程将放入线程池中;
(4)执行的任务会从等待队列中脱离,并在任务执行完后释放任务结点worker_t;
三.线程池控制
这里就线程池实现代码中的部分代码进行解释,请参照我上面给的线程池代码,里面的注释我标注的还是比较详细的。
3.1. 线程池创建
1 CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 2 { 3 ...... //略 4 for (i = 0; i < max_num; i++) 5 { 6 pool->current_pthread_num++; /**< 当前池中的线程数 */ 7 pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool); /**< 创建线程 */ 8 usleep(1000); 9 } 10 }
这里创建了 max_num 个线程 ThreadPoolRoutine( ),即上面提到的空闲线程。
3.2. 投递任务
1 static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 2 { 3 ...... //略 4 worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 5 newworker->process = process; /**< 回调函数,在线程ThreadPoolRoutine()中执行 */ 6 newworker->arg = arg; /**< 回调函数参数 */ 7 newworker->next = NULL; 8 9 pthread_mutex_lock(&(pool->queue_lock)); 10 ...... //将任务结点放入等待队列,略 11 pool->cur_queue_size++; /**< 等待队列加1 */ 12 13 int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; 14 if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) 15 {/**< 如果没有空闲线程且池中当前线程数不超过可容纳最大线程 */ 16 int current_pthread_num = pool->current_pthread_num; 17 pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); /**< 新增线程 */ 18 pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); 19 pool->current_pthread_num++; /**< 当前池中线程总数加1 */ 20 21 pool->current_pthread_task_num++; /**< 分配任务的线程数加1 */ 22 pthread_mutex_unlock (&(pool->queue_lock)); 23 pthread_cond_signal (&(pool->queue_ready)); /**< 发送信号给1个处于条件阻塞等待状态的线程 */ 24 return 0; 25 } 26 27 pool->current_pthread_task_num++; 28 pthread_mutex_unlock(&(pool->queue_lock)); 29 pthread_cond_signal(&(pool->queue_ready)); 30 return 0; 31 }
投递任务时先创建一个任务结点保存回调函数和函数参数,并将任务结点放入等待队列中。代码第14行判断条件为真时,则进行新线程创建,realloc( )会在保存原始内存中的数据不变的基础上新增1个sizeof (pthread_t)大小的内存。之后更新 current_pthread_num 和 current_pthread_task_num ,并发送信号 pthread_cond_signal(&(pool->queue_ready)) 给一个处于条件阻塞等待状态的线程,即线程 ThreadPoolRoutine( )中的
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)) 阻塞等待接收信号。重点讲互斥锁和条件变量:
pthread_mutex_t queue_lock; /**< 互斥锁 */
pthread_cond_t queue_ready; /**< 条件变量 */
这两个变量是线程池实现里很重要的点,建议网上搜索资料学习,这里简要介绍先代码中会用到的相关函数功能:
放张图吧,这样直观点,线程互斥锁的一些调用函数一般都有接触过,这里就不作介绍了。
3.3. 执行线程
1 static void * ThreadPoolRoutine (void *arg) 2 { 3 CThread_pool_t *pool = (CThread_pool_t *)arg; 4 while (1) 5 { 6 pthread_mutex_lock (&(pool->queue_lock)); /**< 上锁, pthread_cond_wait()调用会解锁*/ 7 8 while ((pool->cur_queue_size == 0) && (!pool->shutdown)) /**< 队列没有等待任务*/ 9 { 10 pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); /**< 条件锁阻塞等待条件信号*/ 11 } 12 if (pool->shutdown) 13 { 14 pthread_mutex_unlock (&(pool->queue_lock)); 15 pthread_exit (NULL); /**< 释放线程 */ 16 } 17 18 ..... //略 19 worker_t *worker = pool->queue_head; /**< 取等待队列任务结点头*/ 20 pool->queue_head = worker->next; /**< 链表后移 */ 21 22 pthread_mutex_unlock (&(pool->queue_lock)); 23 (*(worker->process)) (worker->arg); /**< 执行回调函数 */ 24 pthread_mutex_lock (&(pool->queue_lock)); 25 26 pool->current_pthread_task_num--; /**< 函数执行结束 */ 27 free (worker); /**< 释放任务结点 */ 28 worker = NULL; 29 30 if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) 31 { 32 pthread_mutex_unlock (&(pool->queue_lock)); 33 break; /**< 当池中空闲线程超过 free_pthread_num则将线程释放回操作系统 */ 34 } 35 pthread_mutex_unlock (&(pool->queue_lock)); 36 } 37 38 pool->current_pthread_num--; /**< 当前池中线程数减1 */ 39 pthread_exit (NULL); /**< 释放线程*/ 40 return (void*)NULL; 41 }
这个就是用来执行投递任务的线程,在初始创建线程时所有线程都全部阻塞在pthread_cond_wait( )处,此时的线程状态就为空闲线程,也就是线程被挂起;
当收到信号并取得互斥锁时,表明有任务投递过来,则获取等待队列里的任务结点并开始执行回调函数;
函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态;
3.4. 线程销毁
线程销毁主要做的就是销毁线程和释放动态内存,自己看代码就懂了。
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> #include <string.h> #include "lib_thread_pool.h" /*---------------constants/macro definition---------------------*/ /*---------------global variables definition-----------------------*/ /*---------------functions declaration--------------------------*/ static void * ThreadPoolRoutine (void *arg); /*---------------functions definition---------------------------*/ /**************************************************************** * function name : ThreadPoolAddWorkLimit * functional description : 向线程池投递任务,无空闲线程则阻塞 * input parameter : pthis 线程池指针 process 回调函数 arg 回调函数的参数 * output parameter : * return value : 0 - 成功;-1 - 失败 * history : *****************************************************************/ static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) { CThread_pool_t *pool = (CThread_pool_t *)pthis; worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); if (NULL == newworker) { return -1; } newworker->process = process; /**< 回调函数,在线程ThreadPoolRoutine()中执行 */ newworker->arg = arg; /**< 回调函数参数 */ newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); worker_t *member = pool->queue_head; /**< 等待队列任务链表 */ if (member != NULL) { while (member->next != NULL) { member = member->next; } member->next = newworker; /**< 放入链表尾 */ } else { pool->queue_head = newworker; /**< 放入链表头 */ } assert (pool->queue_head != NULL); pool->cur_queue_size++; /**< 等待队列加1 */ int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {/**< 如果没有空闲线程且池中当前线程数不超过可容纳最大线程 */ int current_pthread_num = pool->current_pthread_num; pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); /**< 新增线程 */ pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); pool->current_pthread_num++; /**< 当前池中线程总数加1 */ pool->current_pthread_task_num++; /**< 分配任务的线程数加1 */ pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); /**< 发送信号给1个处于条件阻塞等待状态的线程 */ return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情况加 return 0; } /**************************************************************** * function name : ThreadPoolAddWorkUnlimit * functional description : 向线程池投递任务 * input parameter : pthis 线程池指针 process 回调函数 arg 回调函数的参数 * output parameter : * return value : 0 - 成功;-1 - 失败 * history : *****************************************************************/ static int ThreadPoolAddWorkUnlimit(void* pthis,void *(*process) (void *arg), void *arg) { CThread_pool_t *pool = (CThread_pool_t *)pthis; worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); if (NULL == newworker) { return -1; } newworker->process = process; newworker->arg = arg; newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); worker_t *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) { member = member->next; } member->next = newworker; } else { pool->queue_head = newworker; } assert (pool->queue_head != NULL); pool->cur_queue_size++; int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num; if(0 == FreeThreadNum) /**< 只判断是否没有空闲线程 */ { int current_pthread_num = pool->current_pthread_num; pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine, (void*)pool); pool->current_pthread_num++; if (pool->current_pthread_num > pool->max_thread_num) { pool->max_thread_num = pool->current_pthread_num; } pool->current_pthread_task_num++; pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); return 0; } pool->current_pthread_task_num++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); // usleep(10); //看情况加 return 0; } /**************************************************************** * function name : ThreadPoolGetThreadMaxNum * functional description : 获取线程池可容纳的最大线程数 * input parameter : pthis 线程池指针 * output parameter : * return value : 线程池可容纳的最大线程数 * history : *****************************************************************/ static int ThreadPoolGetThreadMaxNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->max_thread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolGetCurrentThreadNum * functional description : 获取线程池存放的线程数 * input parameter : pthis 线程池指针 * output parameter : * return value : 线程池存放的线程数 * history : *****************************************************************/ static int ThreadPoolGetCurrentThreadNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->current_pthread_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolGetCurrentTaskThreadNum * functional description : 获取当前正在执行任务和已分配任务的线程数目和 * input parameter : pthis 线程池指针 * output parameter : * return value : 当前正在执行任务和已分配任务的线程数目和 * history : *****************************************************************/ static int ThreadPoolGetCurrentTaskThreadNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->current_pthread_task_num; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolGetCurrentTaskNum * functional description : 获取线程池等待队列任务数 * input parameter : pthis 线程池指针 * output parameter : * return value : 等待队列任务数 * history : *****************************************************************/ static int ThreadPoolGetCurrentTaskNum(void* pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; pthread_mutex_lock(&(pool->queue_lock)); int num = pool->cur_queue_size; pthread_mutex_unlock(&(pool->queue_lock)); return num; } /**************************************************************** * function name : ThreadPoolDestroy * functional description : 销毁线程池 * input parameter : pthis 线程池指针 * output parameter : * return value : 0 - 成功;-1 - 失败 * history : *****************************************************************/ static int ThreadPoolDestroy (void *pthis) { CThread_pool_t *pool = (CThread_pool_t *)pthis; if (pool->shutdown) /**< 已销毁 */ { return -1; } pool->shutdown = 1; /**< 销毁标志置位 */ pthread_cond_broadcast (&(pool->queue_ready)); /**< 唤醒所有pthread_cond_wait()等待线程 */ int i; for (i = 0; i < pool->current_pthread_num; i++) { pthread_join (pool->threadid[i], NULL); /**< 等待所有线程执行结束 */ } free (pool->threadid); /**< 释放 */ worker_t *head = NULL; while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free (head); /**< 释放 */ } pthread_mutex_destroy(&(pool->queue_lock)); /**< 销毁 */ pthread_cond_destroy(&(pool->queue_ready)); /**< 销毁 */ free (pool); /**< 释放 */ pool=NULL; return 0; } /**************************************************************** * function name : ThreadPoolRoutine * functional description : 线程池中运行的线程 * input parameter : arg 线程池指针 * output parameter : * return value : NULL * history : *****************************************************************/ static void * ThreadPoolRoutine (void *arg) { CThread_pool_t *pool = (CThread_pool_t *)arg; while (1) { pthread_mutex_lock (&(pool->queue_lock)); /**< 上锁, pthread_cond_wait()调用会解锁*/ while ((pool->cur_queue_size == 0) && (!pool->shutdown)) /**< 队列没有等待任务*/ { pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); /**< 条件锁阻塞等待条件信号*/ } if (pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); pthread_exit (NULL); /**< 释放线程 */ } assert (pool->cur_queue_size != 0); assert (pool->queue_head != NULL); pool->cur_queue_size--; /**< 等待任务减1,准备执行任务*/ worker_t *worker = pool->queue_head; /**< 取等待队列任务结点头*/ pool->queue_head = worker->next; /**< 链表后移 */ pthread_mutex_unlock (&(pool->queue_lock)); (*(worker->process)) (worker->arg); /**< 执行回调函数 */ pthread_mutex_lock (&(pool->queue_lock)); pool->current_pthread_task_num--; /**< 函数执行结束 */ free (worker); /**< 释放任务结点 */ worker = NULL; if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) { pthread_mutex_unlock (&(pool->queue_lock)); break; /**< 当池中空闲线程超过 free_pthread_num则将线程释放回操作系统 */ } pthread_mutex_unlock (&(pool->queue_lock)); } pool->current_pthread_num--; /**< 当前池中线程数减1 */ pthread_exit (NULL); /**< 释放线程*/ return (void*)NULL; } /**************************************************************** * function name : ThreadPoolConstruct * functional description : 创建线程池 * input parameter : max_num 线程池可容纳的最大线程数 free_num 线程池允许存在的最大空闲线程,超过则将线程释放回操作系统 * output parameter : * return value : 线程池指针 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) { CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); if (NULL == pool) { return NULL; } memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init (&(pool->queue_lock), NULL); /**< 初始化互斥锁 */ pthread_cond_init (&(pool->queue_ready), NULL); /**< 初始化条件变量 */ pool->queue_head = NULL; pool->max_thread_num = max_num; /**< 线程池可容纳的最大线程数 */ pool->cur_queue_size = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = free_num; /**< 线程池允许存在的最大空闲线程 */ pool->threadid = NULL; pool->threadid = (pthread_t *) malloc (max_num * sizeof (pthread_t)); pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; /**< 给函数指针赋值 */ pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destruct = ThreadPoolDestroy; pool->GetMaxThreadNum = ThreadPoolGetThreadMaxNum; pool->GetCurThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurTaskNum = ThreadPoolGetCurrentTaskNum; int i = 0; for (i = 0; i < max_num; i++) { pool->current_pthread_num++; /**< 当前池中的线程数 */ pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool); /**< 创建线程 */ usleep(1000); } return pool; } /**************************************************************** * function name : ThreadPoolConstructDefault * functional description : 创建线程池,以默认的方式初始化,未创建线程 * input parameter : * output parameter : * return value : 线程池指针 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstructDefault(void) { CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); if (NULL == pool) { return NULL; } memset(pool, 0, sizeof(CThread_pool_t)); pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = DEFAULT_MAX_THREAD_NUM; /**< 默认值 */ pool->cur_queue_size = 0; pool->current_pthread_task_num = 0; pool->shutdown = 0; pool->current_pthread_num = 0; pool->free_pthread_num = DEFAULT_FREE_THREAD_NUM; /**< 默认值 */ pool->threadid = NULL; pool->AddWorkUnlimit = ThreadPoolAddWorkUnlimit; pool->AddWorkLimit = ThreadPoolAddWorkLimit; pool->Destruct = ThreadPoolDestroy; pool->GetMaxThreadNum = ThreadPoolGetThreadMaxNum; pool->GetCurThreadNum = ThreadPoolGetCurrentThreadNum; pool->GetCurTaskThreadNum = ThreadPoolGetCurrentTaskThreadNum; pool->GetCurTaskNum = ThreadPoolGetCurrentTaskNum; return pool; }
xxx.h
/************************************************************************ * module : 线程池头文件 * file name : lib_thread_pool.h * Author : * version : V1.0 * DATE : * directory : * description : * related document: * ************************************************************************/ /*-----------------------includes-------------------------------*/ #ifndef __PTHREAD_POOL_H__ #define __PTHREAD_POOL_H__ #include <pthread.h> /*---------------constants/macro definition---------------------*/ #define DEFAULT_MAX_THREAD_NUM 100 #define DEFAULT_FREE_THREAD_NUM 10 typedef struct worker_t worker_t; typedef struct CThread_pool_t CThread_pool_t; /*---------------global variables definition-----------------------*/ /*线程池任务结点*/ struct worker_t { void *(*process) (void *arg); /**< 回调函数 */ int paratype; /**< 函数类型(预留) */ void *arg; /**< 回调函数参数 */ struct worker_t *next; /**< 连接下一个任务结点 */ }; /*线程池控制器*/ struct CThread_pool_t { pthread_mutex_t queue_lock; /**< 互斥锁 */ pthread_cond_t queue_ready; /**< 条件变量 */ worker_t *queue_head; /**< 任务结点链表,保存所有投递的任务 */ int shutdown; /**< 线程池销毁标志,1 - 销毁 */ pthread_t *threadid; /**< 线程ID */ int max_thread_num; /**< 线程池可容纳的最大线程数 */ int current_pthread_num; /**< 当前线程池存放的线程数 */ int current_pthread_task_num; /**< 当前正在执行任务和已分配任务的线程数目和 */ int cur_queue_size; /**< 当前等待队列的任务数目 */ int free_pthread_num; /**< 线程池内允许存在的最大空闲线程数 */ /**************************************************************** * function name : ThreadPoolAddWorkLimit * functional description : 向线程池投递任务 * input parameter : pthis 线程池指针 process 回调函数 arg 回调函数的参数 * output parameter : * return value : 0 - 成功;-1 - 失败 * history : *****************************************************************/ int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); /**************************************************************** * function name : ThreadPoolAddWorkUnlimit * functional description : 向线程池投递任务,无空闲线程则阻塞 * input parameter : pthis 线程池指针 process 回调函数 arg 回调函数的参数 * output parameter : * return value : 0 - 成功;-1 - 失败 * history : *****************************************************************/ int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); /**************************************************************** * function name : ThreadPoolGetThreadMaxNum * functional description : 获取线程池可容纳的最大线程数 * input parameter : pthis 线程池指针 * output parameter : * return value : 线程池可容纳的最大线程数 * history : *****************************************************************/ int (*GetMaxThreadNum) (void *pthis); /**************************************************************** * function name : ThreadPoolGetCurrentThreadNum * functional description : 获取线程池存放的线程数 * input parameter : pthis 线程池指针 * output parameter : * return value : 线程池存放的线程数 * history : *****************************************************************/ int (*GetCurThreadNum) (void *pthis); /**************************************************************** * function name : ThreadPoolGetCurrentTaskThreadNum * functional description : 获取当前正在执行任务和已分配任务的线程数目和 * input parameter : pthis 线程池指针 * output parameter : * return value : 当前正在执行任务和已分配任务的线程数目和 * history : *****************************************************************/ int (*GetCurTaskThreadNum) (void *pthis); /**************************************************************** * function name : ThreadPoolGetCurrentTaskNum * functional description : 获取线程池等待队列任务数 * input parameter : pthis 线程池指针 * output parameter : * return value : 等待队列任务数 * history : *****************************************************************/ int (*GetCurTaskNum) (void *pthis); /**************************************************************** * function name : ThreadPoolDestroy * functional description : 销毁线程池 * input parameter : pthis 线程池指针 * output parameter : * return value : 0 - 成功;-1 - 失败 * history : *****************************************************************/ int (*Destruct) (void *pthis); }; /*---------------functions declaration--------------------------*/ /**************************************************************** * function name : ThreadPoolConstruct * functional description : 创建线程池 * input parameter : max_num 线程池可容纳的最大线程数 free_num 线程池允许存在的最大空闲线程,超过则将线程释放回操作系统 * output parameter : * return value : 线程池指针 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num); /**************************************************************** * function name : ThreadPoolConstructDefault * functional description : 创建线程池,以默认的方式初始化,未创建线程 * input parameter : * output parameter : * return value : 线程池指针 * history : *****************************************************************/ CThread_pool_t* ThreadPoolConstructDefault(void); #endif
main.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> #include <string.h> #include "lib_thread_pool.h" static void* thread_1(void* arg); static void* thread_2(void* arg); static void* thread_3(void* arg); static void DisplayPoolStatus(CThread_pool_t* pPool); int nKillThread = 0; int main() { CThread_pool_t* pThreadPool = NULL; pThreadPool = ThreadPoolConstruct(2, 1); int nNumInput = 5; char LogInput[] = "OK!"; DisplayPoolStatus(pThreadPool); /*可用AddWorkLimit()替换看执行的效果*/ pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_1, (void*)NULL); /* * 没加延迟发现连续投递任务时pthread_cond_wait()会收不到信号pthread_cond_signal() !! * 因为AddWorkUnlimit()进去后调用pthread_mutex_lock()把互斥锁锁上,导致pthread_cond_wait() * 收不到信号!!也可在AddWorkUnlimit()里面加个延迟,一般情况可能也遇不到这个问题 */ usleep(10); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_2, (void*)nNumInput); usleep(10); pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_3, (void*)LogInput); usleep(10); DisplayPoolStatus(pThreadPool); nKillThread = 1; usleep(100); /**< 先让线程退出 */ DisplayPoolStatus(pThreadPool); nKillThread = 2; usleep(100); DisplayPoolStatus(pThreadPool); nKillThread = 3; usleep(100); DisplayPoolStatus(pThreadPool); pThreadPool->Destruct((void*)pThreadPool); return 0; } static void* thread_1(void* arg) { printf("Thread 1 is running ! "); while(nKillThread != 1) usleep(10); return NULL; } static void* thread_2(void* arg) { int nNum = (int)arg; printf("Thread 2 is running ! "); printf("Get Number %d ", nNum); while(nKillThread != 2) usleep(10); return NULL; } static void* thread_3(void* arg) { char *pLog = (char*)arg; printf("Thread 3 is running ! "); printf("Get String %s ", pLog); while(nKillThread != 3) usleep(10); return NULL; } static void DisplayPoolStatus(CThread_pool_t* pPool) { static int nCount = 1; printf("****************** "); printf("nCount = %d ", nCount++); printf("max_thread_num = %d ", pPool->GetMaxThreadNum((void*)pPool)); printf("current_pthread_num = %d ", pPool->GetCurThreadNum((void*)pPool)); printf("current_pthread_task_num = %d ", pPool->GetCurTaskThreadNum((void*)pPool)); printf("cur_queue_size = %d ", pPool->GetCurTaskNum((void*)pPool)); printf("****************** "); }