zoukankan      html  css  js  c++  java
  • 线程池理解

    Linux 多线程编程之 线程池 的原理和一个简单的C实现,提高对多线程编

    程的认知,同步处理等操作,以及如何在实际项目中高效的利用多线程开

    发。

    1.  线程池介绍

    为什么需要线程池???

    目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务
    器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,
    但处理时间却相对较短。

    传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创
    建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就
    是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时
    间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执
    行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,
    这笔开销将是不可忽略的。

    线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对
    多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,
    因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的
    延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过
    适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,
    就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从
    而可以防止资源不足。

    2. 线程池结构

    2.1 线程池任务结点结构

    线程池任务结点用来保存用户投递过来的的任务,并放入线程池中的线程来执行,任务结构

     1 1 // 线程池任务结点
     2 2 struct worker_t {
     3 3     void * (* process)(void * arg); /*回调函数*/
     4 4     int    paratype;                /*函数类型(预留)*/
     5 5     void * arg;                     /*回调函数参数*/
     6 6     struct worker_t * next;         /*链接下一个任务节点*/
     7 7 };
     8 
     9 2.2 线程池控制器
    10 
    11 线程池控制器用来对线程池进行控制管理,描述当前线程池的最基本信息,包括任务的投递,线
    12 
    13 程池状态的更新与查询,线程池的销毁等,其结构如下:
    14 
    15 /*线程控制器*/
    16 struct CThread_pool_t {
    17     pthread_mutex_t queue_lock;     /*互斥锁*/
    18     pthread_cond_t  queue_ready;    /*条件变量*/
    19     
    20     worker_t * queue_head;          /*任务节点链表 保存所有投递的任务*/
    21     int shutdown;                   /*线程池销毁标志 1-销毁*/
    22     pthread_t * threadid;           /*线程ID*/
    23     
    24     int max_thread_num;             /*线程池可容纳最大线程数*/
    25     int current_pthread_num;        /*当前线程池存放的线程*/
    26     int current_pthread_task_num;   /*当前已经执行任务和已分配任务的线程数目和*/
    27     int current_wait_queue_num;     /*当前等待队列的的任务数目*/
    28     int free_pthread_num;           /*线程池允许最大的空闲线程数/*/
    29     
    30     /**
    31      *  function:       ThreadPoolAddWorkUnlimit
    32      *  description:    向线程池投递任务
    33      *  input param:    pthis   线程池指针
    34      *                  process 回调函数
    35      *                  arg     回调函数参数
    36      *  return Valr:    0       成功
    37      *                  -1      失败
    38      */     
    39     int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
    40     
    41     /**
    42      *  function:       ThreadPoolAddWorkLimit
    43      *  description:    向线程池投递任务,无空闲线程则阻塞
    44      *  input param:    pthis   线程池指针
    45      *                  process 回调函数
    46      *                  arg     回调函数参数
    47      *  return Val:     0       成功
    48      *                  -1      失败
    49      */     
    50     int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
    51     
    52     /**
    53      *  function:       ThreadPoolGetThreadMaxNum
    54      *  description:    获取线程池可容纳的最大线程数
    55      *  input param:    pthis   线程池指针
    56      */     
    57     int (* GetThreadMaxNum)(void * pthis);
    58     
    59     /**
    60      *  function:       ThreadPoolGetCurrentThreadNum
    61      *  description:    获取线程池存放的线程数
    62      *  input param:    pthis   线程池指针
    63      *  return Val:     线程池存放的线程数
    64      */     
    65     int (* GetCurrentThreadNum)(void * pthis);
    66     
    67     /**
    68      *  function:       ThreadPoolGetCurrentTaskThreadNum
    69      *  description:    获取当前正在执行任务和已经分配任务的线程数目和
    70      *  input param:    pthis   线程池指针
    71      *  return Val:     当前正在执行任务和已经分配任务的线程数目和
    72      */     
    73     int (* GetCurrentTaskThreadNum)(void * pthis);
    74     
    75     /**
    76      *  function:       ThreadPoolGetCurrentWaitTaskNum
    77      *  description:    获取线程池等待队列任务数
    78      *  input param:    pthis   线程池指针
    79      *  return Val:     等待队列任务数
    80      */     
    81     int (* GetCurrentWaitTaskNum)(void * pthis);
    82     
    83     /**
    84      *  function:       ThreadPoolDestroy
    85      *  description:    销毁线程池
    86      *  input param:    pthis   线程池指针
    87      *  return Val:     0       成功
    88      *                  -1      失败
    89      */     
    90     int (* Destroy)(void * pthis);    
    91 };

    2.3 线程池运行结构

    解释:

    1) 图中的线程池中的"空闲"和"执行"分别表示空闲线程和执行线程,空闲线程指在正在等待任务的线程,

     同样执行线程指正在执行任务的线程,  两者是相互转换的。当用户投递任务过来则用空闲线程来执行

     该任务,且空闲线程状态转换为执行线程;当任务执行完后,执行线程状态转变为空闲线程。

    2) 创建线程池时,正常情况会创建一定数量的线程,  所有线程初始化为空闲线程,线程阻塞等待用户

     投递任务。

    3) 用户投递的任务首先放入等待队列queue_head 链表中, 如果线程池中有空闲线程则放入空闲线程中

     执行,否则根据条件选择继续等待空闲线程或者新建一个线程来执行,新建的线程将放入线程池中。

    4) 执行的任务会从等待队列中脱离,并在任务执行完后释放任务结点worker_t 

    3. 线程池控制 / 部分函数解释

    3.1 线程池创建

     创建 max_num 个线程 ThreadPoolRoutine,即空闲线程

     1 /**
     2  *  function:       ThreadPoolConstruct
     3  *  description:    构建线程池
     4  *  input param:    max_num   线程池可容纳的最大线程数
     5  *                  free_num  线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
     6  *  return Val:     线程池指针                 
     7  */     
     8 CThread_pool_t * 
     9 ThreadPoolConstruct(int max_num, int free_num)
    10 {
    11     int i = 0;
    12     
    13     CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    14     if(NULL == pool)
    15         return NULL;
    16     
    17     memset(pool, 0, sizeof(CThread_pool_t));
    18     
    19     /*初始化互斥锁*/
    20     pthread_mutex_init(&(pool->queue_lock), NULL);
    21     /*初始化条件变量*/
    22     pthread_cond_init(&(pool->queue_ready), NULL);
    23     
    24     pool->queue_head                = NULL;
    25     pool->max_thread_num            = max_num; // 线程池可容纳的最大线程数
    26     pool->current_wait_queue_num    = 0;
    27     pool->current_pthread_task_num  = 0;
    28     pool->shutdown                  = 0;
    29     pool->current_pthread_num       = 0;
    30     pool->free_pthread_num          = free_num; // 线程池允许存在最大空闲线程
    31     pool->threadid                  = NULL;
    32     pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));
    33     /*该函数指针赋值*/
    34     pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    35     pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    36     pool->Destroy                   = ThreadPoolDestroy;
    37     pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    38     pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    39     pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    40     pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    41     
    42     for(i=0; i<max_num; i++) {
    43         pool->current_pthread_num++;    // 当前池中的线程数
    44         /*创建线程*/
    45         pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
    46         usleep(1000);        
    47     }
    48     
    49     return pool;
    50 }

    3.2 投递任务

     1 /**
     2  *  function:       ThreadPoolAddWorkLimit
     3  *  description:    向线程池投递任务,无空闲线程则阻塞
     4  *  input param:    pthis   线程池指针
     5  *                  process 回调函数
     6  *                  arg     回调函数参数
     7  *  return Val:     0       成功
     8  *                  -1      失败
     9  */     
    10 int
    11 ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
    12 { 
    13     // int FreeThreadNum = 0;
    14     // int CurrentPthreadNum = 0;
    15     
    16     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    17     
    18     /*为添加的任务队列节点分配内存*/
    19     worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); 
    20     if(NULL == newworker) 
    21         return -1;
    22     
    23     newworker->process  = process;  // 回调函数,在线程ThreadPoolRoutine()中执行
    24     newworker->arg      = arg;      // 回调函数参数
    25     newworker->next     = NULL;      
    26     
    27     pthread_mutex_lock(&(pool->queue_lock));
    28     
    29     /*插入新任务队列节点*/
    30     worker_t * member = pool->queue_head;   // 指向任务队列链表整体
    31     if(member != NULL) {
    32         while(member->next != NULL) // 队列中有节点
    33             member = member->next;  // member指针往后移动
    34             
    35         member->next = newworker;   // 插入到队列链表尾部
    36     } else 
    37         pool->queue_head = newworker; // 插入到队列链表头
    38     
    39     assert(pool->queue_head != NULL);
    40     pool->current_wait_queue_num++; // 等待队列加1
    41     
    42     /*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/
    43     int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    44     /*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/
    45     if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {  //-> 条件为真进行新线程创建
    46         int CurrentPthreadNum = pool->current_pthread_num;
    47         
    48         /*新增线程*/
    49         pool->threadid = (pthread_t *)realloc(pool->threadid, 
    50                                         (CurrentPthreadNum+1) * sizeof(pthread_t));
    51                                         
    52         pthread_create(&(pool->threadid[CurrentPthreadNum]),
    53                                               NULL, ThreadPoolRoutine, (void *)pool);
    54         /*当前线程池中线程总数加1*/                                   
    55         pool->current_pthread_num++;
    56         
    57         /*分配任务线程数加1*/
    58         pool->current_pthread_task_num++;
    59         pthread_mutex_unlock(&(pool->queue_lock));
    60         
    61         /*发送信号给一个处与条件阻塞等待状态的线程*/
    62         pthread_cond_signal(&(pool->queue_ready));
    63         return 0;
    64     }
    65     
    66     pool->current_pthread_task_num++;
    67     pthread_mutex_unlock(&(pool->queue_lock));
    68     
    69     /*发送信号给一个处与条件阻塞等待状态的线程*/
    70     pthread_cond_signal(&(pool->queue_ready));
    71 //  usleep(10);  //看情况  
    72     return 0;
    73 }

    投递任务时先创建一个任务结点保存回调函数和函数参数,并将任务结点放入等待队列中,在代码中

     注释"//->条件为真创建新线程",realloc() 会在保存原始内存中的数据不变的基础上新增1个sizeof(pthread_t)

     大小内存。之后更新current_pthread_num,和current_pthread_task_num;并发送信号

     pthread_cond_signal(&(pool->queue_read)),给一个处于条件阻塞等待状态的线程,即线程ThreadPoolRoutin()

     中的pthread_cond_wait(&(pool->queue_read), &(pool->queue_lock))阻塞等待接收信号,重点讲互

     斥锁和添加变量:

      pthread_mutex_t  queue_lock;   /**< 互斥锁*/

      pthread_cond_t    queue_ready;   /**< 条件变量*/ 

     这两个变量时线程池实现中很重要的点,这里简要介绍代码中会用到的相关函数功能;

    3.3 执行线程

     1 /**
     2  *  function:       ThreadPoolRoutine
     3  *  description:    线程池中执行的线程
     4  *  input param:    arg  线程池指针
     5  */     
     6 void * 
     7 ThreadPoolRoutine(void * arg)
     8 {
     9     CThread_pool_t * pool = (CThread_pool_t *)arg;
    10     
    11     while(1) {
    12         /*上锁,pthread_cond_wait()调用会解锁*/
    13         pthread_mutex_lock(&(pool->queue_lock));
    14         
    15         /*队列没有等待任务*/
    16         while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
    17             /*条件锁阻塞等待条件信号*/
    18             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
    19         }
    20         
    21         if(pool->shutdown) {
    22             pthread_mutex_unlock(&(pool->queue_lock));
    23             pthread_exit(NULL);         // 释放线程
    24         }
    25         
    26         assert(pool->current_wait_queue_num != 0);
    27         assert(pool->queue_head != NULL);
    28         
    29         pool->current_wait_queue_num--; // 等待任务减1,准备执行任务
    30         worker_t * worker = pool->queue_head;   // 去等待队列任务节点头
    31         pool->queue_head = worker->next;        // 链表后移     
    32         pthread_mutex_unlock(&(pool->queue_lock));
    33         
    34         (* (worker->process))(worker->arg);      // 执行回调函数
    35         
    36         pthread_mutex_lock(&(pool->queue_lock));
    37         pool->current_pthread_task_num--;       // 函数执行结束
    38         free(worker);   // 释放任务结点
    39         worker = NULL;
    40         
    41         if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
    42             pthread_mutex_unlock(&(pool->queue_lock));
    43             break;  // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统
    44         }
    45         pthread_mutex_unlock(&(pool->queue_lock));    
    46     }
    47     
    48     pool->current_pthread_num--;    // 当前线程数减1
    49     pthread_exit(NULL);             // 释放线程
    50     
    51     return (void *)NULL;
    52 }

    这个就是用来执行任务的线程,在初始化创建线程时所有线程都全部阻塞在pthread_cond_wait()处

     此时的线程就为空闲线程,也就是线程被挂起,当收到信号并取得互斥锁时,     表明任务投递过来

     则获取等待队列里的任务结点并执行回调函数;  函数执行结束后回去判断当前等待队列是否还有任

     务,有则接下去执行,否则重新阻塞回到空闲线程状态。

    4. 完整代码实现

    4.1 CThreadPool.h 文件

      1 /**
      2  *  线程池头文件
      3  *
      4  **/
      5 
      6 #ifndef _CTHREADPOOL_H_
      7 #define _CTHREADPOOL_H_
      8 
      9 #include <pthread.h>
     10 
     11 /*线程池可容纳最大线程数*/
     12 #define DEFAULT_MAX_THREAD_NUM      100
     13 
     14 /*线程池允许最大的空闲线程,超过则将线程释放回操作系统*/
     15 #define DEFAULT_FREE_THREAD_NUM     10
     16 
     17 typedef struct worker_t         worker_t;
     18 typedef struct CThread_pool_t   CThread_pool_t;
     19 
     20 /*线程池任务节点*/
     21 struct worker_t {
     22     void * (* process)(void * arg); /*回调函数*/
     23     int    paratype;                /*函数类型(预留)*/
     24     void * arg;                     /*回调函数参数*/
     25     struct worker_t * next;         /*链接下一个任务节点*/
     26 };
     27 
     28 /*线程控制器*/
     29 struct CThread_pool_t {
     30     pthread_mutex_t queue_lock;     /*互斥锁*/
     31     pthread_cond_t  queue_ready;    /*条件变量*/
     32     
     33     worker_t * queue_head;          /*任务节点链表 保存所有投递的任务*/
     34     int shutdown;                   /*线程池销毁标志 1-销毁*/
     35     pthread_t * threadid;           /*线程ID*/
     36     
     37     int max_thread_num;             /*线程池可容纳最大线程数*/
     38     int current_pthread_num;        /*当前线程池存放的线程*/
     39     int current_pthread_task_num;   /*当前已经执行任务和已分配任务的线程数目和*/
     40     int current_wait_queue_num;     /*当前等待队列的的任务数目*/
     41     int free_pthread_num;           /*线程池允许最大的空闲线程数/*/
     42     
     43     /**
     44      *  function:       ThreadPoolAddWorkUnlimit
     45      *  description:    向线程池投递任务
     46      *  input param:    pthis   线程池指针
     47      *                  process 回调函数
     48      *                  arg     回调函数参数
     49      *  return Valr:    0       成功
     50      *                  -1      失败
     51      */     
     52     int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);
     53     
     54     /**
     55      *  function:       ThreadPoolAddWorkLimit
     56      *  description:    向线程池投递任务,无空闲线程则阻塞
     57      *  input param:    pthis   线程池指针
     58      *                  process 回调函数
     59      *                  arg     回调函数参数
     60      *  return Val:     0       成功
     61      *                  -1      失败
     62      */     
     63     int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);
     64     
     65     /**
     66      *  function:       ThreadPoolGetThreadMaxNum
     67      *  description:    获取线程池可容纳的最大线程数
     68      *  input param:    pthis   线程池指针
     69      */     
     70     int (* GetThreadMaxNum)(void * pthis);
     71     
     72     /**
     73      *  function:       ThreadPoolGetCurrentThreadNum
     74      *  description:    获取线程池存放的线程数
     75      *  input param:    pthis   线程池指针
     76      *  return Val:     线程池存放的线程数
     77      */     
     78     int (* GetCurrentThreadNum)(void * pthis);
     79     
     80     /**
     81      *  function:       ThreadPoolGetCurrentTaskThreadNum
     82      *  description:    获取当前正在执行任务和已经分配任务的线程数目和
     83      *  input param:    pthis   线程池指针
     84      *  return Val:     当前正在执行任务和已经分配任务的线程数目和
     85      */     
     86     int (* GetCurrentTaskThreadNum)(void * pthis);
     87     
     88     /**
     89      *  function:       ThreadPoolGetCurrentWaitTaskNum
     90      *  description:    获取线程池等待队列任务数
     91      *  input param:    pthis   线程池指针
     92      *  return Val:     等待队列任务数
     93      */     
     94     int (* GetCurrentWaitTaskNum)(void * pthis);
     95     
     96     /**
     97      *  function:       ThreadPoolDestroy
     98      *  description:    销毁线程池
     99      *  input param:    pthis   线程池指针
    100      *  return Val:     0       成功
    101      *                  -1      失败
    102      */     
    103     int (* Destroy)(void * pthis);    
    104 };
    105 
    106 /**
    107  *  function:       ThreadPoolConstruct
    108  *  description:    构建线程池
    109  *  input param:    max_num   线程池可容纳的最大线程数
    110  *                  free_num  线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
    111  *  return Val:     线程池指针                 
    112  */     
    113 CThread_pool_t * ThreadPoolConstruct(int max_num, int free_num);
    114 
    115 /**
    116  *  function:       ThreadPoolConstructDefault
    117  *  description:    创建线程池,以默认的方式初始化,未创建线程
    118  *
    119  *  return Val:     线程池指针                 
    120  */     
    121 CThread_pool_t * ThreadPoolConstructDefault(void);
    122 
    123 #endif  // _CTHREADPOOL_H_

    4.2 CThreadPool.c 文件

      1 /**
      2  *  线程池实现
      3  *
      4  **/
      5 
      6 #include <stdio.h>
      7 #include <stdlib.h>
      8 #include <string.h>
      9 #include <unistd.h>
     10 #include <sys/types.h>
     11 #include <pthread.h>
     12 #include <assert.h>
     13 
     14 #include "CThreadPool.h"
     15 
     16 void * ThreadPoolRoutine(void * arg); 
     17 
     18 /**
     19  *  function:       ThreadPoolAddWorkLimit
     20  *  description:    向线程池投递任务,无空闲线程则阻塞
     21  *  input param:    pthis   线程池指针
     22  *                  process 回调函数
     23  *                  arg     回调函数参数
     24  *  return Val:     0       成功
     25  *                  -1      失败
     26  */     
     27 int
     28 ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
     29 { 
     30     // int FreeThreadNum = 0;
     31     // int CurrentPthreadNum = 0;
     32     
     33     CThread_pool_t * pool = (CThread_pool_t *)pthis;
     34     
     35     /*为添加的任务队列节点分配内存*/
     36     worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); 
     37     if(NULL == newworker) 
     38         return -1;
     39     
     40     newworker->process  = process;  // 回调函数,在线程ThreadPoolRoutine()中执行
     41     newworker->arg      = arg;      // 回调函数参数
     42     newworker->next     = NULL;      
     43     
     44     pthread_mutex_lock(&(pool->queue_lock));
     45     
     46     /*插入新任务队列节点*/
     47     worker_t * member = pool->queue_head;   // 指向任务队列链表整体
     48     if(member != NULL) {
     49         while(member->next != NULL) // 队列中有节点
     50             member = member->next;  // member指针往后移动
     51             
     52         member->next = newworker;   // 插入到队列链表尾部
     53     } else 
     54         pool->queue_head = newworker; // 插入到队列链表头
     55     
     56     assert(pool->queue_head != NULL);
     57     pool->current_wait_queue_num++; // 等待队列加1
     58     
     59     /*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/
     60     int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
     61     /*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/
     62     if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {
     63         int CurrentPthreadNum = pool->current_pthread_num;
     64         
     65         /*新增线程*/
     66         pool->threadid = (pthread_t *)realloc(pool->threadid, 
     67                                         (CurrentPthreadNum+1) * sizeof(pthread_t));
     68                                         
     69         pthread_create(&(pool->threadid[CurrentPthreadNum]),
     70                                               NULL, ThreadPoolRoutine, (void *)pool);
     71         /*当前线程池中线程总数加1*/                                   
     72         pool->current_pthread_num++;
     73         
     74         /*分配任务线程数加1*/
     75         pool->current_pthread_task_num++;
     76         pthread_mutex_unlock(&(pool->queue_lock));
     77         
     78         /*发送信号给一个处与条件阻塞等待状态的线程*/
     79         pthread_cond_signal(&(pool->queue_ready));
     80         return 0;
     81     }
     82     
     83     pool->current_pthread_task_num++;
     84     pthread_mutex_unlock(&(pool->queue_lock));
     85     
     86     /*发送信号给一个处与条件阻塞等待状态的线程*/
     87     pthread_cond_signal(&(pool->queue_ready));
     88 //  usleep(10);  //看情况  
     89     return 0;
     90 }
     91 
     92 /**
     93  *  function:       ThreadPoolAddWorkUnlimit
     94  *  description:    向线程池投递任务
     95  *  input param:    pthis   线程池指针
     96  *                  process 回调函数
     97  *                  arg     回调函数参数
     98  *  return Valr:    0       成功
     99  *                  -1      失败
    100  */
    101 int
    102 ThreadPoolAddWorkUnlimit(void * pthis, void * (* process)(void * arg), void * arg)
    103 {
    104     // int FreeThreadNum = 0;
    105     // int CurrentPthreadNum = 0;
    106     
    107     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    108     
    109     /*给新任务队列节点分配内存*/
    110     worker_t * newworker = (worker_t *)malloc(sizeof(worker_t));
    111     if(NULL == newworker)
    112         return -1;
    113     
    114     newworker->process  = process;  // 回调函数
    115     newworker->arg      = arg;      // 回调函数参数
    116     newworker->next     = NULL;
    117     
    118     pthread_mutex_lock(&(pool->queue_lock));
    119     
    120     /*新节点插入任务队列链表操作*/
    121     worker_t * member = pool->queue_head;
    122     if(member != NULL) {
    123         while(member->next != NULL)
    124             member = member->next;
    125         
    126         member->next = newworker;       // 插入队列链表尾部
    127     } else 
    128         pool->queue_head = newworker;   // 插入到头(也就是第一个节点,之前链表没有节点)
    129     
    130     assert(pool->queue_head != NULL);
    131     pool->current_wait_queue_num++;     // 当前等待队列的的任务数目+1
    132     
    133     int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
    134     /*只判断是否没有空闲线程*/
    135     if(0 == FreeThreadNum) {
    136         int CurrentPthreadNum = pool->current_pthread_num;
    137         pool->threadid = (pthread_t *)realloc(pool->threadid,
    138                                            (CurrentPthreadNum+1)*sizeof(pthread_t));
    139         pthread_create(&(pool->threadid[CurrentPthreadNum]),NULL,
    140                                         ThreadPoolRoutine, (void *)pool);
    141         pool->current_pthread_num++;
    142         if(pool->current_pthread_num > pool->max_thread_num)
    143             pool->max_thread_num = pool->current_pthread_num;
    144         
    145         pool->current_pthread_task_num++;
    146         pthread_mutex_unlock(&(pool->queue_lock));
    147         pthread_cond_signal(&(pool->queue_ready));
    148         return 0;
    149     }
    150     
    151     pool->current_pthread_task_num++;
    152     pthread_mutex_unlock(&(pool->queue_lock));
    153     pthread_cond_signal(&(pool->queue_ready));
    154 //  usleep(10);    
    155     return 0;   
    156 }
    157 
    158 /**
    159  *  function:       ThreadPoolGetThreadMaxNum
    160  *  description:    获取线程池可容纳的最大线程数
    161  *  input param:    pthis   线程池指针
    162  *  return val:     线程池可容纳的最大线程数
    163  */     
    164 int
    165 ThreadPoolGetThreadMaxNum(void * pthis)
    166 {
    167     int num = 0;   
    168     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    169     
    170     pthread_mutex_lock(&(pool->queue_lock));
    171     num = pool->max_thread_num;
    172     pthread_mutex_unlock(&(pool->queue_lock));
    173     
    174     return num;
    175 }
    176 
    177 /**
    178  *  function:       ThreadPoolGetCurrentThreadNum
    179  *  description:    获取线程池存放的线程数
    180  *  input param:    pthis   线程池指针
    181  *  return Val:     线程池存放的线程数
    182  */     
    183 int 
    184 ThreadPoolGetCurrentThreadNum(void * pthis)
    185 {
    186     int num = 0;
    187     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    188     
    189     pthread_mutex_lock(&(pool->queue_lock));
    190     num = pool->current_pthread_num;
    191     pthread_mutex_unlock(&(pool->queue_lock));
    192     
    193     return num;       
    194 }
    195 
    196 /**
    197  *  function:       ThreadPoolGetCurrentTaskThreadNum
    198  *  description:    获取当前正在执行任务和已经分配任务的线程数目和
    199  *  input param:    pthis   线程池指针
    200  *  return Val:     当前正在执行任务和已经分配任务的线程数目和
    201  */   
    202 int
    203 ThreadPoolGetCurrentTaskThreadNum(void * pthis)
    204 {
    205     int num = 0;
    206     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    207     
    208     pthread_mutex_lock(&(pool->queue_lock));
    209     num = pool->current_pthread_task_num;
    210     pthread_mutex_unlock(&(pool->queue_lock));
    211     
    212     return num;   
    213 }
    214 
    215 /**
    216  *  function:       ThreadPoolGetCurrentWaitTaskNum
    217  *  description:    获取线程池等待队列任务数
    218  *  input param:    pthis   线程池指针
    219  *  return Val:     等待队列任务数
    220  */     
    221 int
    222 ThreadPoolGetCurrentWaitTaskNum(void * pthis)
    223 {
    224     int num = 0;
    225     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    226     
    227     pthread_mutex_lock(&(pool->queue_lock));
    228     num = pool->current_wait_queue_num;
    229     pthread_mutex_unlock(&(pool->queue_lock));
    230     
    231     return num;   
    232 }
    233 
    234 /**
    235  *  function:       ThreadPoolDestroy
    236  *  description:    销毁线程池
    237  *  input param:    pthis   线程池指针
    238  *  return Val:     0       成功
    239  *                  -1      失败
    240  */     
    241 int
    242 ThreadPoolDestroy(void * pthis)
    243 {
    244     int i;
    245     CThread_pool_t * pool = (CThread_pool_t *)pthis;
    246     
    247     if(pool->shutdown)      // 已销毁
    248         return -1;
    249         
    250     pool->shutdown = 1;     // 销毁标志置位
    251     
    252     /*唤醒所有pthread_cond_wait()等待线程*/
    253     pthread_cond_broadcast(&(pool->queue_ready));
    254     for(i=0; i<pool->current_pthread_num; i++)
    255         pthread_join(pool->threadid[i], NULL);  // 等待所有线程执行结束
    256     
    257     free(pool->threadid);   // 释放
    258        
    259     /*销毁任务队列链表*/
    260     worker_t * head = NULL;
    261     while(pool->queue_head != NULL) {
    262         head = pool->queue_head;
    263         pool->queue_head = pool->queue_head->next;
    264         free(head);    
    265     }
    266     
    267     /*销毁锁*/
    268     pthread_mutex_destroy(&(pool->queue_lock));
    269     pthread_cond_destroy(&(pool->queue_ready));
    270     
    271     free(pool);
    272     pool = NULL;
    273     
    274     return 0;
    275 }
    276 
    277 /**
    278  *  function:       ThreadPoolRoutine
    279  *  description:    线程池中运行的线程
    280  *  input param:    arg  线程池指针
    281  */     
    282 void * 
    283 ThreadPoolRoutine(void * arg)
    284 {
    285     CThread_pool_t * pool = (CThread_pool_t *)arg;
    286     
    287     while(1) {
    288         /*上锁,pthread_cond_wait()调用会解锁*/
    289         pthread_mutex_lock(&(pool->queue_lock));
    290         
    291         /*队列没有等待任务*/
    292         while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {
    293             /*条件锁阻塞等待条件信号*/
    294             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
    295         }
    296         
    297         if(pool->shutdown) {
    298             pthread_mutex_unlock(&(pool->queue_lock));
    299             pthread_exit(NULL);         // 释放线程
    300         }
    301         
    302         assert(pool->current_wait_queue_num != 0);
    303         assert(pool->queue_head != NULL);
    304         
    305         pool->current_wait_queue_num--; // 等待任务减1,准备执行任务
    306         worker_t * worker = pool->queue_head;   // 去等待队列任务节点头
    307         pool->queue_head = worker->next;        // 链表后移     
    308         pthread_mutex_unlock(&(pool->queue_lock));
    309         
    310         (* (worker->process))(worker->arg);      // 执行回调函数
    311         
    312         pthread_mutex_lock(&(pool->queue_lock));
    313         pool->current_pthread_task_num--;       // 函数执行结束
    314         free(worker);   // 释放任务结点
    315         worker = NULL;
    316         
    317         if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {
    318             pthread_mutex_unlock(&(pool->queue_lock));
    319             break;  // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统
    320         }
    321         pthread_mutex_unlock(&(pool->queue_lock));    
    322     }
    323     
    324     pool->current_pthread_num--;    // 当前线程数减1
    325     pthread_exit(NULL);             // 释放线程
    326     
    327     return (void *)NULL;
    328 }
    329 
    330 /**
    331  *  function:       ThreadPoolConstruct
    332  *  description:    构建线程池
    333  *  input param:    max_num   线程池可容纳的最大线程数
    334  *                  free_num  线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
    335  *  return Val:     线程池指针                 
    336  */     
    337 CThread_pool_t * 
    338 ThreadPoolConstruct(int max_num, int free_num)
    339 {
    340     int i = 0;
    341     
    342     CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    343     if(NULL == pool)
    344         return NULL;
    345     
    346     memset(pool, 0, sizeof(CThread_pool_t));
    347     
    348     /*初始化互斥锁*/
    349     pthread_mutex_init(&(pool->queue_lock), NULL);
    350     /*初始化条件变量*/
    351     pthread_cond_init(&(pool->queue_ready), NULL);
    352     
    353     pool->queue_head                = NULL;
    354     pool->max_thread_num            = max_num; // 线程池可容纳的最大线程数
    355     pool->current_wait_queue_num    = 0;
    356     pool->current_pthread_task_num  = 0;
    357     pool->shutdown                  = 0;
    358     pool->current_pthread_num       = 0;
    359     pool->free_pthread_num          = free_num; // 线程池允许存在最大空闲线程
    360     pool->threadid                  = NULL;
    361     pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));
    362     /*该函数指针赋值*/
    363     pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    364     pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    365     pool->Destroy                   = ThreadPoolDestroy;
    366     pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    367     pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    368     pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    369     pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    370     
    371     for(i=0; i<max_num; i++) {
    372         pool->current_pthread_num++;    // 当前池中的线程数
    373         /*创建线程*/
    374         pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);
    375         usleep(1000);        
    376     }
    377     
    378     return pool;
    379 }
    380 
    381 /**
    382  *  function:       ThreadPoolConstructDefault
    383  *  description:    创建线程池,以默认的方式初始化,未创建线程
    384  *
    385  *  return Val:     线程池指针                 
    386  */     
    387 CThread_pool_t * 
    388 ThreadPoolConstructDefault(void)
    389 {
    390     CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));
    391     if(NULL == pool)
    392         return NULL;
    393     
    394     memset(pool, 0, sizeof(CThread_pool_t));
    395     
    396     pthread_mutex_init(&(pool->queue_lock), NULL);
    397     pthread_cond_init(&(pool->queue_ready), NULL);
    398     
    399     pool->queue_head                = NULL;
    400     pool->max_thread_num            = DEFAULT_MAX_THREAD_NUM; // 默认值
    401     pool->current_wait_queue_num    = 0;
    402     pool->current_pthread_task_num  = 0;
    403     pool->shutdown                  = 0;
    404     pool->current_pthread_num       = 0;
    405     pool->free_pthread_num          = DEFAULT_FREE_THREAD_NUM; // 默认值
    406     pool->threadid                  = NULL;
    407     /*该函数指针赋值*/
    408     pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
    409     pool->AddWorkLimit              = ThreadPoolAddWorkLimit;
    410     pool->Destroy                   = ThreadPoolDestroy;
    411     pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;
    412     pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;
    413     pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;
    414     pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;
    415     
    416     return pool;
    417 }

    4.3 测试 main.c 文件

      1 #include <stdio.h> 
      2 #include <stdlib.h> 
      3 #include <unistd.h> 
      4 #include <sys/types.h> 
      5 #include <pthread.h> 
      6 #include <assert.h> 
      7 #include <string.h>
      8 
      9 #include "CThreadPool.h"
     10 
     11 
     12 void * thread_1(void * arg);
     13 void * thread_2(void * arg);
     14 void * thread_3(void * arg);
     15 void DisplayPoolStatus(CThread_pool_t * pPool);
     16 
     17 int nKillThread = 0;
     18 
     19 int main()
     20 {
     21     CThread_pool_t * pThreadPool = NULL;
     22     
     23     pThreadPool = ThreadPoolConstruct(5, 1);
     24     int nNumInput = 5;
     25     char LogInput[] = "OK!";
     26 
     27     DisplayPoolStatus(pThreadPool);
     28     /*可用AddWorkLimit()替换看执行的效果*/
     29     pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_1, (void *)NULL);
     30     /*
     31      * 没加延迟发现连续投递任务时pthread_cond_wait()会收不到信号pthread_cond_signal() !!
     32      * 因为AddWorkUnlimit()进去后调用pthread_mutex_lock()把互斥锁锁上,导致pthread_cond_wait()
     33      * 收不到信号!!也可在AddWorkUnlimit()里面加个延迟,一般情况可能也遇不到这个问题
     34      */
     35     usleep(10);    
     36     pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_2, (void *)nNumInput);
     37     usleep(10);
     38     pThreadPool->AddWorkUnlimit((void *)pThreadPool, (void *)thread_3, (void *)LogInput);
     39     usleep(10);
     40     DisplayPoolStatus(pThreadPool);
     41 
     42     nKillThread = 1;
     43     usleep(100);    /**< 先让线程退出 */
     44     DisplayPoolStatus(pThreadPool);
     45     nKillThread = 2;
     46     usleep(100);
     47     DisplayPoolStatus(pThreadPool);
     48     nKillThread = 3;
     49     usleep(100);
     50     DisplayPoolStatus(pThreadPool);
     51 
     52     pThreadPool->Destroy((void*)pThreadPool);
     53     return 0;
     54 }
     55 
     56 void * 
     57 thread_1(void * arg)
     58 {
     59     printf("Thread 1 is running !
    ");
     60     while(nKillThread != 1)
     61         usleep(10);
     62     return NULL;
     63 }
     64 
     65 void * 
     66 thread_2(void * arg)
     67 {
     68     int nNum = (int)arg;
     69     
     70     printf("Thread 2 is running !
    ");
     71     printf("Get Number %d
    ", nNum);
     72     while(nKillThread != 2)
     73         usleep(10);
     74     return NULL;
     75 }
     76 
     77 void * 
     78 thread_3(void * arg)
     79 {
     80     char * pLog = (char *)arg;
     81     
     82     printf("Thread 3 is running !
    ");
     83     printf("Get String %s
    ", pLog);
     84     while(nKillThread != 3)
     85         usleep(10);
     86     return NULL;
     87 }
     88 
     89 void 
     90 DisplayPoolStatus(CThread_pool_t * pPool)
     91 {
     92     static int nCount = 1;
     93     
     94     printf("****************************
    ");
     95     printf("nCount = %d
    ", nCount++);
     96     printf("max_thread_num = %d
    ", pPool->GetThreadMaxNum((void *)pPool));
     97     printf("current_pthread_num = %d
    ", pPool->GetCurrentThreadNum((void *)pPool));
     98     printf("current_pthread_task_num = %d
    ", pPool->GetCurrentTaskThreadNum((void *)pPool));
     99     printf("current_wait_queue_num = %d
    ", pPool->GetCurrentWaitTaskNum((void *)pPool));
    100     printf("****************************
    ");
    101 }

    4.4 Makefile

    简单写一个makefile

     1 CC = gcc
     2 CFLAGS = -g -Wall -o2
     3 LIB = -lpthread
     4 
     5 RUNE = $(CC) $(CFLAGS) $(object) -o $(exe) $(LIB)
     6 RUNO = $(CC) $(CFLAGS) -c $< -o $@ $(LIB)
     7 
     8 .RHONY:clean
     9 
    10 
    11 object = main.o CThreadPool.o
    12 exe = CThreadpool
    13 
    14 $(exe):$(object)
    15     $(RUNE)
    16 
    17 %.o:%.c CThreadPool.h
    18     $(RUNO)
    19 %.o:%.c
    20     $(RUNO)
    21 
    22 
    23 clean:
    24     -rm -rf *.o CThreadpool *~

    注意:使用模式规则,能引入用户自定义变量,为多个文件建立相同的规则,规则中的相关

     文件前必须用“%”表明。关于Makefile的一些规则解释见另一篇

    转:https://www.cnblogs.com/zhaoosheLBJ/p/9337291.html

  • 相关阅读:
    子类构造函数中调用虚函数问题验证
    socks5代理浅识
    关于C++标准库(第2版)std::remove_if的"特性"概述
    动态获取结构体中指定的属性值
    构造和析构函数定义为私有场景
    remove_pointer使用测验
    广播自定义消息实现进程间的通信问题
    遍历窗口权限问题
    嵌入窗口到桌面的问题
    实验一 熟悉实验环境
  • 原文地址:https://www.cnblogs.com/mysky007/p/12313190.html
Copyright © 2011-2022 走看看