zoukankan      html  css  js  c++  java
  • Linux简单线程池实现(带源码)

      这里给个线程池的实现代码,里面带有个应用小例子,方便学习使用,代码 GCC 编译可用。参照代码看下面介绍的线程池原理跟容易接受,百度云下载链接:

      http://pan.baidu.com/s/1i3zMHDV

    一.线程池简介

      为什么使用线程池?

      目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即 时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态,这笔开销将是不可忽略的。

      线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

    二.线程池的结构

    2.1. 线程池任务结点

      线程池任务结点用来保存用户投递过来的任务,并放入线程池中的线程执行。其结构名为worker_t,定义如下:

    复制代码
    1 /*线程池任务结点*/
    2 struct worker_t
    3 {
    4     void *(*process) (void *arg);    /**< 回调函数 */
    5     int   paratype;                    /**< 函数类型(预留) */
    6     void *arg;                        /**< 回调函数参数 */
    7     struct worker_t *next;            /**< 连接下一个任务结点 */
    8 };
    复制代码

    2.2. 线程池控制器

      线程池控制器用来对线程池进行控制管理,包括任务的投递、线程池状态的更新与查询、线程池的销毁等。其结构名为CThread_pool_t,定义如下:

    复制代码
     1 /*线程池控制器*/
     2 struct CThread_pool_t
     3 {
     4     pthread_mutex_t queue_lock;    /**< 互斥锁 */
     5     pthread_cond_t queue_ready;    /**< 条件变量 */
     6 
     7     worker_t *queue_head;        /**< 任务结点链表,保存所有投递的任务 */
     8     int shutdown;            /**< 线程池销毁标志,1 -> 销毁 */
     9     pthread_t *threadid;        /**< 线程ID */
    10     int max_thread_num;        /**< 线程池可容纳的最大线程数 */
    11     int current_pthread_num;    /**< 当前线程池存放的线程数 */
    12     int current_pthread_task_num;    /**< 当前正在执行任务和已分配任务的线程数目和 */
    13     int cur_queue_size;        /**< 当前等待队列的任务数目 */
    14     int free_pthread_num;        /**< 线程池内允许存在的最大空闲线程数 */
    15 
    16     /*向线程池投递任务*/
    17     int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 
    18     /*向线程池投递任务,无空闲线程则阻塞*/
    19     int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 
    20     /*获取线程池可容纳的最大线程数*/
    21     int (*GetMaxThreadNum) (void *pthis); 
    22     /*获取线程池存放的线程数*/
    23     int (*GetCurThreadNum) (void *pthis); 
    24     /*获取当前正在执行任务和已分配任务的线程数目和*/
    25     int (*GetCurTaskThreadNum) (void *pthis); 
    26     /*获取线程池等待队列任务数*/
    27     int (*GetCurTaskNum) (void *pthis); 
    28     /*销毁线程池*/
    29     int (*Destruct) (void *pthis); 
    30 };
    复制代码

    2.3. 线程池结构

      线程池的运行结构图如下:

      根据上图罗列几个注意的地方:

      (1)图中的线程池中的“空闲”和“执行”分别表示空闲线程和执行线程,空闲线程指在正在等待任务的线程,同样执行线程指正在执行任务的线程,两者是相互转换的。当用户投递任务过来则用空闲线程来执行该任务,且空闲线程状态转变为执行线程;当任务执行完后,执行线程状态转变为空闲线程;

      (2)创建线程池时,正常情况会先创建一定数量的线程,所有线程初始为空闲线程,线程阻塞等待用户投递任务;

      (3)用户投递的任务首先放入等待队列 queue_head 链表中,如果线程池中有空闲线程则放入空闲线程中执行,否则根据条件选择继续等待空闲线程或者新建一个线程来执行,新建的线程将放入线程池中;

      (4)执行的任务会从等待队列中脱离,并在任务执行完后释放任务结点worker_t;

    三.线程池控制

      这里就线程池实现代码中的部分代码进行解释,请参照我上面给的线程池代码,里面的注释我标注的还是比较详细的。

    3.1. 线程池创建

    复制代码
     1 CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 
     2 {
     3     ......    //略
     4     for (i = 0; i < max_num; i++) 
     5     {  
     6         pool->current_pthread_num++;    /**< 当前池中的线程数 */
     7         pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool);    /**< 创建线程 */
     8         usleep(1000);
     9     }   
    10 }
    复制代码

      这里创建了 max_num 个线程 ThreadPoolRoutine( ),即上面提到的空闲线程。

    3.2. 投递任务

    复制代码
     1 static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 
     2 { 
     3     ......    //略
     4     worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
     5     newworker->process     = process;    /**< 回调函数,在线程ThreadPoolRoutine()中执行 */
     6     newworker->arg         = arg;        /**< 回调函数参数 */
     7     newworker->next     = NULL;
     8     
     9     pthread_mutex_lock(&(pool->queue_lock)); 
    10     ......    //将任务结点放入等待队列,略
    11     pool->cur_queue_size++;        /**< 等待队列加1 */
    12     
    13     int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;        
    14     if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num))
    15     {/**< 如果没有空闲线程且池中当前线程数不超过可容纳最大线程 */
    16         int current_pthread_num = pool->current_pthread_num;
    17         pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t));     /**< 新增线程 */
    18         pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
    19         pool->current_pthread_num++;    /**< 当前池中线程总数加1 */    
    20         
    21         pool->current_pthread_task_num++;    /**< 分配任务的线程数加1 */    
    22         pthread_mutex_unlock (&(pool->queue_lock)); 
    23         pthread_cond_signal (&(pool->queue_ready));    /**< 发送信号给1个处于条件阻塞等待状态的线程 */    
    24         return 0;
    25     }
    26     
    27     pool->current_pthread_task_num++; 
    28     pthread_mutex_unlock(&(pool->queue_lock)); 
    29     pthread_cond_signal(&(pool->queue_ready));
    30     return 0; 
    31 }
    复制代码

      投递任务时先创建一个任务结点保存回调函数和函数参数,并将任务结点放入等待队列中。代码第14行判断条件为真时,则进行新线程创建,realloc( )会在保存原始内存中的数据不变的基础上新增1个sizeof (pthread_t)大小的内存。之后更新 current_pthread_num 和 current_pthread_task_num ,并发送信号 pthread_cond_signal(&(pool->queue_ready)) 给一个处于条件阻塞等待状态的线程,即线程 ThreadPoolRoutine( )中的

    pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)) 阻塞等待接收信号。重点讲互斥锁和条件变量:

      pthread_mutex_t queue_lock;   /**< 互斥锁 */

      pthread_cond_t queue_ready;   /**< 条件变量 */

      这两个变量是线程池实现里很重要的点,建议网上搜索资料学习,这里简要介绍先代码中会用到的相关函数功能:

      放张图吧,这样直观点,线程互斥锁的一些调用函数一般都有接触过,这里就不作介绍了。

    3.3. 执行线程  

    复制代码
     1 static void * ThreadPoolRoutine (void *arg) 
     2 { 
     3     CThread_pool_t *pool = (CThread_pool_t *)arg;
     4     while (1) 
     5     { 
     6         pthread_mutex_lock (&(pool->queue_lock)); /**< 上锁, pthread_cond_wait()调用会解锁*/
     7         
     8         while ((pool->cur_queue_size == 0) && (!pool->shutdown))    /**< 队列没有等待任务*/
     9         { 
    10             pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));     /**< 条件锁阻塞等待条件信号*/
    11         } 
    12         if (pool->shutdown) 
    13         { 
    14             pthread_mutex_unlock (&(pool->queue_lock)); 
    15             pthread_exit (NULL);     /**< 释放线程 */
    16         } 
    17         
    18         .....    //略
    19         worker_t *worker     = pool->queue_head;    /**< 取等待队列任务结点头*/ 
    20         pool->queue_head     = worker->next;     /**< 链表后移 */ 
    21         
    22         pthread_mutex_unlock (&(pool->queue_lock)); 
    23         (*(worker->process)) (worker->arg);     /**< 执行回调函数 */ 
    24         pthread_mutex_lock (&(pool->queue_lock)); 
    25         
    26         pool->current_pthread_task_num--;    /**< 函数执行结束 */ 
    27         free (worker);     /**< 释放任务结点 */
    28         worker = NULL; 
    29         
    30         if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num)
    31         {
    32             pthread_mutex_unlock (&(pool->queue_lock)); 
    33             break;    /**< 当池中空闲线程超过 free_pthread_num则将线程释放回操作系统 */
    34         }
    35         pthread_mutex_unlock (&(pool->queue_lock));         
    36     } 
    37     
    38     pool->current_pthread_num--;    /**< 当前池中线程数减1 */
    39     pthread_exit (NULL);    /**< 释放线程*/
    40     return (void*)NULL;
    41 }
    复制代码

      这个就是用来执行投递任务的线程,在初始创建线程时所有线程都全部阻塞在pthread_cond_wait( )处,此时的线程状态就为空闲线程,也就是线程被挂起;

      当收到信号并取得互斥锁时,表明有任务投递过来,则获取等待队列里的任务结点并开始执行回调函数;

      函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态;

    3.4. 线程销毁

      线程销毁主要做的就是销毁线程和释放动态内存,自己看代码就懂了。 

    完整代码:
    xxx.c
    #include <stdio.h> 
    #include <stdlib.h> 
    #include <unistd.h> 
    #include <sys/types.h> 
    #include <pthread.h> 
    #include <assert.h> 
    #include <string.h>
    #include "lib_thread_pool.h"
    
    
    /*---------------constants/macro definition---------------------*/
    
    /*---------------global variables definition-----------------------*/
    
    /*---------------functions declaration--------------------------*/
    static void * ThreadPoolRoutine (void *arg); 
    
    /*---------------functions definition---------------------------*/
    
    /****************************************************************
    * function name         : ThreadPoolAddWorkLimit
    * functional description    : 向线程池投递任务,无空闲线程则阻塞
    * input parameter        : pthis    线程池指针
                          process    回调函数
                          arg        回调函数的参数
    * output parameter    : 
    * return value            : 0 - 成功;-1 - 失败 
    * history                : 
    *****************************************************************/
    static int ThreadPoolAddWorkLimit(void* pthis,void *(*process) (void *arg), void *arg) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
        if (NULL == newworker)
        {
            return -1;
        }
        newworker->process     = process;    /**< 回调函数,在线程ThreadPoolRoutine()中执行 */
        newworker->arg         = arg;        /**< 回调函数参数 */
        newworker->next     = NULL;
    
        pthread_mutex_lock(&(pool->queue_lock)); 
        
        worker_t *member = pool->queue_head;    /**< 等待队列任务链表 */
        if (member != NULL) 
        { 
            while (member->next != NULL) 
            {
                member = member->next; 
            }
            member->next = newworker;    /**< 放入链表尾 */
        } 
        else 
        { 
            pool->queue_head = newworker;    /**< 放入链表头 */
        } 
        
        assert (pool->queue_head != NULL); 
        pool->cur_queue_size++;        /**< 等待队列加1 */
    
        int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;        
        if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num))
        {/**< 如果没有空闲线程且池中当前线程数不超过可容纳最大线程 */
            int current_pthread_num = pool->current_pthread_num;
            pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t));     /**< 新增线程 */
            pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
            pool->current_pthread_num++;    /**< 当前池中线程总数加1 */    
            
            pool->current_pthread_task_num++;    /**< 分配任务的线程数加1 */    
            pthread_mutex_unlock (&(pool->queue_lock)); 
            pthread_cond_signal (&(pool->queue_ready));    /**< 发送信号给1个处于条件阻塞等待状态的线程 */    
            return 0;
        }
    
        pool->current_pthread_task_num++; 
        pthread_mutex_unlock(&(pool->queue_lock)); 
        pthread_cond_signal(&(pool->queue_ready));
    //    usleep(10);    //看情况加
        return 0; 
    } 
    
    /****************************************************************
    * function name         : ThreadPoolAddWorkUnlimit
    * functional description    : 向线程池投递任务
    * input parameter        : pthis    线程池指针
                          process    回调函数
                          arg        回调函数的参数
    * output parameter    : 
    * return value            : 0 - 成功;-1 - 失败 
    * history                : 
    *****************************************************************/
    static int ThreadPoolAddWorkUnlimit(void* pthis,void *(*process) (void *arg), void *arg) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        worker_t *newworker = (worker_t *) malloc (sizeof (worker_t)); 
        
        if (NULL == newworker)
        {
            return -1;
        }
        newworker->process     = process;    
        newworker->arg         = arg;         
        newworker->next     = NULL;        
    
        pthread_mutex_lock(&(pool->queue_lock)); 
        
        worker_t *member = pool->queue_head; 
        if (member != NULL)     
        { 
            while (member->next != NULL) 
            {
                member = member->next; 
            }
            member->next = newworker; 
        } 
        else 
        { 
            pool->queue_head = newworker; 
        } 
        
        assert (pool->queue_head != NULL); 
        pool->cur_queue_size++; 
    
        int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;
        if(0 == FreeThreadNum)    /**< 只判断是否没有空闲线程 */
        {
            int current_pthread_num = pool->current_pthread_num;
            pool->threadid = (pthread_t *) realloc(pool->threadid,(current_pthread_num + 1) * sizeof (pthread_t)); 
            pthread_create (&(pool->threadid[current_pthread_num]), NULL, ThreadPoolRoutine,  (void*)pool);
            pool->current_pthread_num++;
            if (pool->current_pthread_num > pool->max_thread_num)
            {
                pool->max_thread_num = pool->current_pthread_num;
            }
            
            pool->current_pthread_task_num++;
            pthread_mutex_unlock (&(pool->queue_lock)); 
            pthread_cond_signal (&(pool->queue_ready)); 
            return 0;
        }
        
        pool->current_pthread_task_num++;
        pthread_mutex_unlock(&(pool->queue_lock)); 
        pthread_cond_signal(&(pool->queue_ready));     
    //    usleep(10);    //看情况加
        return 0; 
    } 
    
    /****************************************************************
    * function name         : ThreadPoolGetThreadMaxNum
    * functional description    : 获取线程池可容纳的最大线程数
    * input parameter        : pthis    线程池指针
    * output parameter    : 
    * return value            : 线程池可容纳的最大线程数
    * history                : 
    *****************************************************************/
    static int ThreadPoolGetThreadMaxNum(void* pthis) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        
        pthread_mutex_lock(&(pool->queue_lock)); 
        int num = pool->max_thread_num;
        pthread_mutex_unlock(&(pool->queue_lock)); 
        
        return num; 
    } 
    
    /****************************************************************
    * function name         : ThreadPoolGetCurrentThreadNum
    * functional description    : 获取线程池存放的线程数
    * input parameter        : pthis    线程池指针
    * output parameter    : 
    * return value            : 线程池存放的线程数
    * history                : 
    *****************************************************************/
    static int ThreadPoolGetCurrentThreadNum(void* pthis) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        
        pthread_mutex_lock(&(pool->queue_lock)); 
        int num = pool->current_pthread_num;
        pthread_mutex_unlock(&(pool->queue_lock)); 
        
        return num; 
    } 
    
    /****************************************************************
    * function name         : ThreadPoolGetCurrentTaskThreadNum
    * functional description    : 获取当前正在执行任务和已分配任务的线程数目和
    * input parameter        : pthis    线程池指针
    * output parameter    : 
    * return value            : 当前正在执行任务和已分配任务的线程数目和
    * history                : 
    *****************************************************************/
    static int ThreadPoolGetCurrentTaskThreadNum(void* pthis) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        
        pthread_mutex_lock(&(pool->queue_lock)); 
        int num = pool->current_pthread_task_num;
        pthread_mutex_unlock(&(pool->queue_lock)); 
        
        return num; 
    } 
    
    /****************************************************************
    * function name         : ThreadPoolGetCurrentTaskNum
    * functional description    : 获取线程池等待队列任务数
    * input parameter        : pthis    线程池指针
    * output parameter    : 
    * return value            : 等待队列任务数
    * history                : 
    *****************************************************************/
    static int ThreadPoolGetCurrentTaskNum(void* pthis) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        
        pthread_mutex_lock(&(pool->queue_lock)); 
        int num = pool->cur_queue_size;
        pthread_mutex_unlock(&(pool->queue_lock)); 
        
        return num; 
    } 
    
    /****************************************************************
    * function name         : ThreadPoolDestroy
    * functional description    : 销毁线程池
    * input parameter        : pthis    线程池指针
    * output parameter    : 
    * return value            : 0 - 成功;-1 - 失败
    * history                : 
    *****************************************************************/
    static int ThreadPoolDestroy (void *pthis) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)pthis;
        
        if (pool->shutdown) /**< 已销毁 */
        {
            return -1;
        }
        pool->shutdown = 1;    /**< 销毁标志置位 */
        
        pthread_cond_broadcast (&(pool->queue_ready)); /**< 唤醒所有pthread_cond_wait()等待线程 */
        int i; 
        for (i = 0; i < pool->current_pthread_num; i++) 
        {
            pthread_join (pool->threadid[i], NULL); /**< 等待所有线程执行结束 */
        }
        
        free (pool->threadid);    /**< 释放 */
        worker_t *head = NULL; 
        
        while (pool->queue_head != NULL) 
        { 
            head = pool->queue_head; 
            pool->queue_head = pool->queue_head->next; 
            free (head);    /**< 释放 */
        } 
        
        pthread_mutex_destroy(&(pool->queue_lock));    /**< 销毁 */
        pthread_cond_destroy(&(pool->queue_ready)); /**< 销毁 */     
        free (pool);    /**< 释放 */ 
        pool=NULL;     
        return 0; 
    } 
    
    
    /****************************************************************
    * function name         : ThreadPoolRoutine
    * functional description    : 线程池中运行的线程
    * input parameter        : arg    线程池指针
    * output parameter    : 
    * return value            : NULL
    * history                : 
    *****************************************************************/
    static void * ThreadPoolRoutine (void *arg) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *)arg;
        
        while (1) 
        { 
            pthread_mutex_lock (&(pool->queue_lock)); /**< 上锁, pthread_cond_wait()调用会解锁*/
            
            while ((pool->cur_queue_size == 0) && (!pool->shutdown))    /**< 队列没有等待任务*/
            { 
                pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));     /**< 条件锁阻塞等待条件信号*/
            } 
            if (pool->shutdown) 
            { 
                pthread_mutex_unlock (&(pool->queue_lock)); 
                pthread_exit (NULL);     /**< 释放线程 */
            } 
    
            assert (pool->cur_queue_size != 0); 
            assert (pool->queue_head != NULL); 
             
            pool->cur_queue_size--;     /**< 等待任务减1,准备执行任务*/ 
            worker_t *worker     = pool->queue_head;    /**< 取等待队列任务结点头*/ 
            pool->queue_head     = worker->next;     /**< 链表后移 */ 
            
            pthread_mutex_unlock (&(pool->queue_lock)); 
            (*(worker->process)) (worker->arg);     /**< 执行回调函数 */ 
            pthread_mutex_lock (&(pool->queue_lock)); 
            
            pool->current_pthread_task_num--;    /**< 函数执行结束 */ 
            free (worker);     /**< 释放任务结点 */
            worker = NULL; 
    
            if ((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num)
            {
                pthread_mutex_unlock (&(pool->queue_lock)); 
                break;    /**< 当池中空闲线程超过 free_pthread_num则将线程释放回操作系统 */
            }
            pthread_mutex_unlock (&(pool->queue_lock));         
        } 
        
        pool->current_pthread_num--;    /**< 当前池中线程数减1 */
        pthread_exit (NULL);    /**< 释放线程*/
        return (void*)NULL;
    } 
    
    /****************************************************************
    * function name         : ThreadPoolConstruct
    * functional description    : 创建线程池
    * input parameter        : max_num    线程池可容纳的最大线程数
                          free_num    线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
    * output parameter    : 
    * return value            : 线程池指针
    * history                : 
    *****************************************************************/
    CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); 
        if (NULL == pool)
        {
            return NULL;
        }
        memset(pool, 0, sizeof(CThread_pool_t));
        
        pthread_mutex_init (&(pool->queue_lock), NULL);    /**< 初始化互斥锁 */
        pthread_cond_init (&(pool->queue_ready), NULL);    /**< 初始化条件变量 */ 
    
        pool->queue_head                 = NULL; 
        pool->max_thread_num             = max_num;    /**< 线程池可容纳的最大线程数 */
        pool->cur_queue_size             = 0; 
        pool->current_pthread_task_num     = 0;
        pool->shutdown                     = 0; 
        pool->current_pthread_num         = 0;
        pool->free_pthread_num             = free_num;    /**< 线程池允许存在的最大空闲线程 */
        pool->threadid                    = NULL;
        pool->threadid                     = (pthread_t *) malloc (max_num * sizeof (pthread_t)); 
        pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;    /**< 给函数指针赋值 */
        pool->AddWorkLimit                = ThreadPoolAddWorkLimit;
        pool->Destruct                    = ThreadPoolDestroy;
        pool->GetMaxThreadNum            = ThreadPoolGetThreadMaxNum;
        pool->GetCurThreadNum            = ThreadPoolGetCurrentThreadNum;
        pool->GetCurTaskThreadNum        = ThreadPoolGetCurrentTaskThreadNum;
        pool->GetCurTaskNum                = ThreadPoolGetCurrentTaskNum;
        
        int i = 0; 
        for (i = 0; i < max_num; i++) 
        {  
            pool->current_pthread_num++;    /**< 当前池中的线程数 */
            pthread_create (&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void*)pool);    /**< 创建线程 */
            usleep(1000);
        } 
    
        return pool;
    } 
    
    /****************************************************************
    * function name         : ThreadPoolConstructDefault
    * functional description    : 创建线程池,以默认的方式初始化,未创建线程
    * input parameter        : 
    * output parameter    : 
    * return value            : 线程池指针
    * history                : 
    *****************************************************************/
    CThread_pool_t* ThreadPoolConstructDefault(void) 
    { 
        CThread_pool_t *pool = (CThread_pool_t *) malloc (sizeof (CThread_pool_t)); 
        if (NULL == pool)
        {
            return NULL;
        }
        memset(pool, 0, sizeof(CThread_pool_t));
        
        pthread_mutex_init(&(pool->queue_lock), NULL); 
        pthread_cond_init(&(pool->queue_ready), NULL); 
    
        pool->queue_head                 = NULL; 
        pool->max_thread_num             = DEFAULT_MAX_THREAD_NUM;    /**< 默认值 */
        pool->cur_queue_size             = 0; 
        pool->current_pthread_task_num     = 0;
        pool->shutdown                     = 0; 
        pool->current_pthread_num         = 0;
        pool->free_pthread_num             = DEFAULT_FREE_THREAD_NUM;    /**< 默认值 */
        pool->threadid                    = NULL;
        pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;
        pool->AddWorkLimit                = ThreadPoolAddWorkLimit;
        pool->Destruct                    = ThreadPoolDestroy;
        pool->GetMaxThreadNum            = ThreadPoolGetThreadMaxNum;
        pool->GetCurThreadNum            = ThreadPoolGetCurrentThreadNum;
        pool->GetCurTaskThreadNum        = ThreadPoolGetCurrentTaskThreadNum;
        pool->GetCurTaskNum                = ThreadPoolGetCurrentTaskNum;
        
        return pool;
    }

    xxx.h

    /************************************************************************
    * module            : 线程池头文件
    * file name        : lib_thread_pool.h
    * Author             : 
    * version            : V1.0
    * DATE            : 
    * directory         : 
    * description        : 
    * related document: 
    * 
    ************************************************************************/
    /*-----------------------includes-------------------------------*/
    #ifndef __PTHREAD_POOL_H__
    #define __PTHREAD_POOL_H__
    
    #include <pthread.h>
    
    /*---------------constants/macro definition---------------------*/
    #define DEFAULT_MAX_THREAD_NUM        100
    #define DEFAULT_FREE_THREAD_NUM        10    
    typedef struct worker_t worker_t;
    typedef struct CThread_pool_t CThread_pool_t;
    
    /*---------------global variables definition-----------------------*/
    /*线程池任务结点*/
    struct worker_t
    {
        void *(*process) (void *arg);    /**< 回调函数 */
        int   paratype;                    /**< 函数类型(预留) */
        void *arg;                        /**< 回调函数参数 */
        struct worker_t *next;            /**< 连接下一个任务结点 */
    };
    
    /*线程池控制器*/
    struct CThread_pool_t
    {
        pthread_mutex_t queue_lock;    /**< 互斥锁 */
        pthread_cond_t queue_ready;    /**< 条件变量 */
    
        worker_t *queue_head;    /**< 任务结点链表,保存所有投递的任务 */
        int shutdown;            /**< 线程池销毁标志,1 - 销毁 */
        pthread_t *threadid;    /**< 线程ID */
        int max_thread_num;        /**< 线程池可容纳的最大线程数 */
        int current_pthread_num;    /**< 当前线程池存放的线程数 */
        int current_pthread_task_num;    /**< 当前正在执行任务和已分配任务的线程数目和 */
        int cur_queue_size;        /**< 当前等待队列的任务数目 */
        int    free_pthread_num;    /**< 线程池内允许存在的最大空闲线程数 */
        
        /****************************************************************
        * function name         : ThreadPoolAddWorkLimit
        * functional description    : 向线程池投递任务
        * input parameter        : pthis    线程池指针
                              process    回调函数
                              arg        回调函数的参数
        * output parameter    : 
        * return value            : 0 - 成功;-1 - 失败 
        * history                : 
        *****************************************************************/
        int (*AddWorkUnlimit)(void* pthis,void *(*process) (void *arg), void *arg); 
    
        /****************************************************************
        * function name         : ThreadPoolAddWorkUnlimit
        * functional description    : 向线程池投递任务,无空闲线程则阻塞
        * input parameter        : pthis    线程池指针
                              process    回调函数
                              arg        回调函数的参数
        * output parameter    : 
        * return value            : 0 - 成功;-1 - 失败 
        * history                : 
        *****************************************************************/
        int (*AddWorkLimit)(void* pthis,void *(*process) (void *arg), void *arg); 
        
        /****************************************************************
        * function name         : ThreadPoolGetThreadMaxNum
        * functional description    : 获取线程池可容纳的最大线程数
        * input parameter        : pthis    线程池指针
        * output parameter    : 
        * return value            : 线程池可容纳的最大线程数
        * history                : 
        *****************************************************************/
        int (*GetMaxThreadNum) (void *pthis); 
    
        /****************************************************************
        * function name         : ThreadPoolGetCurrentThreadNum
        * functional description    : 获取线程池存放的线程数
        * input parameter        : pthis    线程池指针
        * output parameter    : 
        * return value            : 线程池存放的线程数
        * history                : 
        *****************************************************************/    
        int (*GetCurThreadNum) (void *pthis); 
        
        /****************************************************************
        * function name         : ThreadPoolGetCurrentTaskThreadNum
        * functional description    : 获取当前正在执行任务和已分配任务的线程数目和
        * input parameter        : pthis    线程池指针
        * output parameter    : 
        * return value            : 当前正在执行任务和已分配任务的线程数目和
        * history                : 
        *****************************************************************/
        int (*GetCurTaskThreadNum) (void *pthis); 
        
        /****************************************************************
        * function name         : ThreadPoolGetCurrentTaskNum
        * functional description    : 获取线程池等待队列任务数
        * input parameter        : pthis    线程池指针
        * output parameter    : 
        * return value            : 等待队列任务数
        * history                : 
        *****************************************************************/
        int (*GetCurTaskNum) (void *pthis); 
        
        /****************************************************************
        * function name         : ThreadPoolDestroy
        * functional description    : 销毁线程池
        * input parameter        : pthis    线程池指针
        * output parameter    : 
        * return value            : 0 - 成功;-1 - 失败
        * history                : 
        *****************************************************************/
        int (*Destruct) (void *pthis); 
    };
    
    /*---------------functions declaration--------------------------*/
    /****************************************************************
    * function name         : ThreadPoolConstruct
    * functional description    : 创建线程池
    * input parameter        : max_num    线程池可容纳的最大线程数
                          free_num    线程池允许存在的最大空闲线程,超过则将线程释放回操作系统
    * output parameter    : 
    * return value            : 线程池指针
    * history                : 
    *****************************************************************/
    CThread_pool_t* ThreadPoolConstruct(int max_num,int free_num);
    
    /****************************************************************
    * function name         : ThreadPoolConstructDefault
    * functional description    : 创建线程池,以默认的方式初始化,未创建线程
    * input parameter        : 
    * output parameter    : 
    * return value            : 线程池指针
    * history                : 
    *****************************************************************/
    CThread_pool_t* ThreadPoolConstructDefault(void);
    
    #endif

    main.c

    #include <stdio.h> 
    #include <stdlib.h> 
    #include <unistd.h> 
    #include <sys/types.h> 
    #include <pthread.h> 
    #include <assert.h> 
    #include <string.h>
    
    #include "lib_thread_pool.h"
    
    
    static void* thread_1(void* arg);
    static void* thread_2(void* arg);
    static void* thread_3(void* arg);
    static void DisplayPoolStatus(CThread_pool_t* pPool);
    
    int nKillThread = 0;
    
    int main()
    {
        CThread_pool_t* pThreadPool = NULL;
        pThreadPool = ThreadPoolConstruct(2, 1);
        int nNumInput = 5;
        char LogInput[] = "OK!";
    
        DisplayPoolStatus(pThreadPool);
        /*可用AddWorkLimit()替换看执行的效果*/
        pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_1, (void*)NULL);
        /*
        * 没加延迟发现连续投递任务时pthread_cond_wait()会收不到信号pthread_cond_signal() !!
        * 因为AddWorkUnlimit()进去后调用pthread_mutex_lock()把互斥锁锁上,导致pthread_cond_wait()
        * 收不到信号!!也可在AddWorkUnlimit()里面加个延迟,一般情况可能也遇不到这个问题
        */
        usleep(10);    
        pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_2, (void*)nNumInput);
        usleep(10);
        pThreadPool->AddWorkUnlimit((void*)pThreadPool, thread_3, (void*)LogInput);
        usleep(10);
        DisplayPoolStatus(pThreadPool);
    
        nKillThread = 1;
        usleep(100);    /**< 先让线程退出 */
        DisplayPoolStatus(pThreadPool);
        nKillThread = 2;
        usleep(100);
        DisplayPoolStatus(pThreadPool);
        nKillThread = 3;
        usleep(100);
        DisplayPoolStatus(pThreadPool);
    
        pThreadPool->Destruct((void*)pThreadPool);
        return 0;
    }
    
    static void* thread_1(void* arg)
    {
        printf("Thread 1 is running !
    ");
        while(nKillThread != 1)
            usleep(10);
        return NULL;
    }
    
    static void* thread_2(void* arg)
    {
        int nNum = (int)arg;
        
        printf("Thread 2 is running !
    ");
        printf("Get Number %d
    ", nNum);
        while(nKillThread != 2)
            usleep(10);
        return NULL;
    }
    
    static void* thread_3(void* arg)
    {
        char *pLog = (char*)arg;
        
        printf("Thread 3 is running !
    ");
        printf("Get String %s
    ", pLog);
        while(nKillThread != 3)
            usleep(10);
        return NULL;
    }
    
    static void DisplayPoolStatus(CThread_pool_t* pPool)
    {
        static int nCount = 1;
        
        printf("******************
    ");
        printf("nCount = %d
    ", nCount++);
        printf("max_thread_num = %d
    ", pPool->GetMaxThreadNum((void*)pPool));
        printf("current_pthread_num = %d
    ", pPool->GetCurThreadNum((void*)pPool));
        printf("current_pthread_task_num = %d
    ", pPool->GetCurTaskThreadNum((void*)pPool));
        printf("cur_queue_size = %d
    ", pPool->GetCurTaskNum((void*)pPool));
        printf("******************
    ");
    }
  • 相关阅读:
    计蒜客38228 Max answer 单调栈 + 线段树
    Codeforces 103D Time to Raid Cowavans 分块
    Poj 2352 Stars
    HDU 6203 ping ping ping LCA + 贪心
    redis——数据库发展
    数据库拆分
    java基础算法
    docker部署redis集群
    docker网络
    DockerFile
  • 原文地址:https://www.cnblogs.com/jiangzhaowei/p/10383049.html
Copyright © 2011-2022 走看看