zoukankan      html  css  js  c++  java
  • C实现线程池

    第一部分为头文件

     1 #ifndef __THREADPOOL_H_
     2 #define __THREADPOOL_H_
     3 
     4 typedef struct threadpool_t threadpool_t;
     5 
     6 /**
     7  * @function threadpool_create
     8  * @descCreates a threadpool_t object.
     9  * @param thr_num  thread num
    10  * @param max_thr_num  max thread size
    11  * @param queue_max_size   size of the queue.
    12  * @return a newly created thread pool or NULL
    13  */
    14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    15 
    16 /**
    17  * @function threadpool_add
    18  * @desc add a new task in the queue of a thread pool
    19  * @param pool     Thread pool to which add the task.
    20  * @param function Pointer to the function that will perform the task.
    21  * @param argument Argument to be passed to the function.
    22  * @return 0 if all goes well,else -1
    23  */
    24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
    25 
    26 /**
    27  * @function threadpool_destroy
    28  * @desc Stops and destroys a thread pool.
    29  * @param pool  Thread pool to destroy.
    30  * @return 0 if destory success else -1
    31  */
    32 int threadpool_destroy(threadpool_t *pool);
    33 
    34 /**
    35  * @desc get the thread num
    36  * @pool pool threadpool
    37  * @return # of the thread
    38  */
    39 int threadpool_all_threadnum(threadpool_t *pool);
    40 
    41 /**
    42  * desc get the busy thread num
    43  * @param pool threadpool
    44  * return # of the busy thread
    45  */
    46 int threadpool_busy_threadnum(threadpool_t *pool);
    47 
    48 #endif

    第二部分为自实现线程池代码(对libevent库进行一些精简,凸显逻辑)

      1 #include <stdlib.h>
      2 #include <pthread.h>
      3 #include <unistd.h>
      4 #include <assert.h>
      5 #include <stdio.h>
      6 #include <string.h>
      7 #include <signal.h>
      8 #include <errno.h>
      9 #include "threadpool.h"
     10 
     11 #define DEFAULT_TIME 10                 /*10s检测一次*/
     12 #define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
     13 #define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
     14 #define true 1
     15 #define false 0
     16 
     17 typedef struct {
     18     void *(*function)(void *);          /* 函数指针,回调函数 */
     19     void *arg;                          /* 上面函数的参数 */
     20 } threadpool_task_t;                    /* 各子线程任务结构体 */
     21 
     22 /* 描述线程池相关信息 */
     23 struct threadpool_t {
     24     pthread_mutex_t lock;               /* 用于锁住本结构体 */    
     25     pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */
     26 
     27     pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
     28     pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */
     29 
     30     pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
     31     pthread_t adjust_tid;               /* 存管理线程tid */
     32     threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */
     33 
     34     int min_thr_num;                    /* 线程池最小线程数 */
     35     int max_thr_num;                    /* 线程池最大线程数 */
     36     int live_thr_num;                   /* 当前存活线程个数 */
     37     int busy_thr_num;                   /* 忙状态线程个数 */
     38     int wait_exit_thr_num;              /* 要销毁的线程个数 */
     39 
     40     int queue_front;                    /* task_queue队头下标 */
     41     int queue_rear;                     /* task_queue队尾下标 */
     42     int queue_size;                     /* task_queue队中实际任务数 */
     43     int queue_max_size;                 /* task_queue队列可容纳任务数上限 */
     44 
     45     int shutdown;                       /* 标志位,线程池使用状态,true或false */
     46 };
     47 
     48 void *threadpool_thread(void *threadpool);
     49 
     50 void *adjust_thread(void *threadpool);
     51 
     52 int is_thread_alive(pthread_t tid);
     53 int threadpool_free(threadpool_t *pool);
     54 
     55 //threadpool_create(3,100,100);  
     56 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
     57 {
     58     int i;
     59     threadpool_t *pool = NULL;
     60     do {
     61         if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
     62             printf("malloc threadpool fail");
     63             break;                                      /*跳出do while*/
     64         }
     65 
     66         pool->min_thr_num = min_thr_num;
     67         pool->max_thr_num = max_thr_num;
     68         pool->busy_thr_num = 0;
     69         pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
     70         pool->wait_exit_thr_num = 0;
     71         pool->queue_size = 0;                           /* 有0个产品 */
     72         pool->queue_max_size = queue_max_size;
     73         pool->queue_front = 0;
     74         pool->queue_rear = 0;
     75         pool->shutdown = false;                         /* 不关闭线程池 */
     76 
     77         /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
     78         pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
     79         if (pool->threads == NULL) {
     80             printf("malloc threads fail");
     81             break;
     82         }
     83         memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
     84 
     85         /* 队列开辟空间 */
     86         pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
     87         if (pool->task_queue == NULL) {
     88             printf("malloc task_queue fail");
     89             break;
     90         }
     91 
     92         /* 初始化互斥琐、条件变量 */
     93         if (pthread_mutex_init(&(pool->lock), NULL) != 0
     94                 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
     95                 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
     96                 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
     97         {
     98             printf("init the lock or cond fail");
     99             break;
    100         }
    101 
    102         /* 启动 min_thr_num 个 work thread */
    103         for (i = 0; i < min_thr_num; i++) {
    104             pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
    105             printf("start thread 0x%x...
    ", (unsigned int)pool->threads[i]);
    106         }
    107         pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
    108 
    109         return pool;
    110 
    111     } while (0);
    112 
    113     threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */
    114 
    115     return NULL;
    116 }
    117 
    118 /* 向线程池中 添加一个任务 */
    119 //threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/
    120 
    121 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    122 {
    123     pthread_mutex_lock(&(pool->lock));
    124 
    125     /* ==为真,队列已经满, 调wait阻塞 */
    126     while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
    127         pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    128     }
    129     if (pool->shutdown) {
    130         pthread_cond_broadcast(&(pool->queue_not_empty));
    131         pthread_mutex_unlock(&(pool->lock));
    132         return 0;
    133     }
    134 
    135     /* 清空 工作线程 调用的回调函数 的参数arg */
    136     if (pool->task_queue[pool->queue_rear].arg != NULL) {
    137         pool->task_queue[pool->queue_rear].arg = NULL;
    138     }
    139     /*添加任务到任务队列里*/
    140     pool->task_queue[pool->queue_rear].function = function;
    141     pool->task_queue[pool->queue_rear].arg = arg;
    142     pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */
    143     pool->queue_size++;
    144 
    145     /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
    146     pthread_cond_signal(&(pool->queue_not_empty));
    147     pthread_mutex_unlock(&(pool->lock));
    148 
    149     return 0;
    150 }
    151 
    152 /* 线程池中各个工作线程 */
    153 void *threadpool_thread(void *threadpool)
    154 {
    155     threadpool_t *pool = (threadpool_t *)threadpool;
    156     threadpool_task_t task;
    157 
    158     while (true) {
    159         /* Lock must be taken to wait on conditional variable */
    160         /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
    161         pthread_mutex_lock(&(pool->lock));
    162 
    163         /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
    164         while ((pool->queue_size == 0) && (!pool->shutdown)) {  
    165             printf("thread 0x%x is waiting
    ", (unsigned int)pthread_self());
    166             pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    167 
    168             /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
    169             if (pool->wait_exit_thr_num > 0) {
    170                 pool->wait_exit_thr_num--;
    171 
    172                 /*如果线程池里线程个数大于最小值时可以结束当前线程*/
    173                 if (pool->live_thr_num > pool->min_thr_num) {
    174                     printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    175                     pool->live_thr_num--;
    176                     pthread_mutex_unlock(&(pool->lock));
    177 
    178                     pthread_exit(NULL);
    179                 }
    180             }
    181         }
    182 
    183         /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
    184         if (pool->shutdown) {
    185             pthread_mutex_unlock(&(pool->lock));
    186             printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    187             pthread_detach(pthread_self());
    188             pthread_exit(NULL);     /* 线程自行结束 */
    189         }
    190 
    191         /*从任务队列里获取任务, 是一个出队操作*/
    192         task.function = pool->task_queue[pool->queue_front].function;
    193         task.arg = pool->task_queue[pool->queue_front].arg;
    194 
    195         pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
    196         pool->queue_size--;
    197 
    198         /*通知可以有新的任务添加进来*/
    199         pthread_cond_broadcast(&(pool->queue_not_full));
    200 
    201         /*任务取出后,立即将 线程池琐 释放*/
    202         pthread_mutex_unlock(&(pool->lock));
    203 
    204         /*执行任务*/ 
    205         printf("thread 0x%x start working
    ", (unsigned int)pthread_self());
    206         pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
    207         pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
    208         pthread_mutex_unlock(&(pool->thread_counter));
    209 
    210         (*(task.function))(task.arg);                                           /*执行回调函数任务*/
    211         //task.function(task.arg);                                              /*执行回调函数任务*/
    212 
    213         /*任务结束处理*/ 
    214         printf("thread 0x%x end working
    ", (unsigned int)pthread_self());
    215         pthread_mutex_lock(&(pool->thread_counter));
    216         pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
    217         pthread_mutex_unlock(&(pool->thread_counter));
    218     }
    219 
    220     pthread_exit(NULL);
    221 }
    222 
    223 /* 管理线程 */
    224 void *adjust_thread(void *threadpool)
    225 {
    226     int i;
    227     threadpool_t *pool = (threadpool_t *)threadpool;
    228     while (!pool->shutdown) {
    229 
    230         sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/
    231 
    232         pthread_mutex_lock(&(pool->lock));
    233         int queue_size = pool->queue_size;                      /* 关注 任务数 */
    234         int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
    235         pthread_mutex_unlock(&(pool->lock));
    236 
    237         pthread_mutex_lock(&(pool->thread_counter));
    238         int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
    239         pthread_mutex_unlock(&(pool->thread_counter));
    240 
    241         /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
    242         if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
    243             pthread_mutex_lock(&(pool->lock));  
    244             int add = 0;
    245 
    246             /*一次增加 DEFAULT_THREAD 个线程*/
    247             for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
    248                     && pool->live_thr_num < pool->max_thr_num; i++) {
    249                 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
    250                     pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    251                     add++;
    252                     pool->live_thr_num++;
    253                 }
    254             }
    255 
    256             pthread_mutex_unlock(&(pool->lock));
    257         }
    258 
    259         /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
    260         if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {
    261 
    262             /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
    263             pthread_mutex_lock(&(pool->lock));
    264             pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
    265             pthread_mutex_unlock(&(pool->lock));
    266 
    267             for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
    268                 /* 通知处在空闲状态的线程, 他们会自行终止*/
    269                 pthread_cond_signal(&(pool->queue_not_empty));
    270             }
    271         }
    272     }
    273 
    274     return NULL;
    275 }
    276 
    277 int threadpool_destroy(threadpool_t *pool)
    278 {
    279     int i;
    280     if (pool == NULL) {
    281         return -1;
    282     }
    283     pool->shutdown = true;
    284 
    285     /*先销毁管理线程*/
    286     pthread_join(pool->adjust_tid, NULL);
    287 
    288     for (i = 0; i < pool->live_thr_num; i++) {
    289         /*通知所有的空闲线程*/
    290         pthread_cond_broadcast(&(pool->queue_not_empty));
    291     }
    292     for (i = 0; i < pool->live_thr_num; i++) {
    293         pthread_join(pool->threads[i], NULL);
    294     }
    295     threadpool_free(pool);
    296 
    297     return 0;
    298 }
    299 
    300 int threadpool_free(threadpool_t *pool)
    301 {
    302     if (pool == NULL) {
    303         return -1;
    304     }
    305 
    306     if (pool->task_queue) {
    307         free(pool->task_queue);
    308     }
    309     if (pool->threads) {
    310         free(pool->threads);
    311         pthread_mutex_lock(&(pool->lock));
    312         pthread_mutex_destroy(&(pool->lock));
    313         pthread_mutex_lock(&(pool->thread_counter));
    314         pthread_mutex_destroy(&(pool->thread_counter));
    315         pthread_cond_destroy(&(pool->queue_not_empty));
    316         pthread_cond_destroy(&(pool->queue_not_full));
    317     }
    318     free(pool);
    319     pool = NULL;
    320 
    321     return 0;
    322 }
    323 
    324 int threadpool_all_threadnum(threadpool_t *pool)
    325 {
    326     int all_threadnum = -1;
    327     pthread_mutex_lock(&(pool->lock));
    328     all_threadnum = pool->live_thr_num;
    329     pthread_mutex_unlock(&(pool->lock));
    330     return all_threadnum;
    331 }
    332 
    333 int threadpool_busy_threadnum(threadpool_t *pool)
    334 {
    335     int busy_threadnum = -1;
    336     pthread_mutex_lock(&(pool->thread_counter));
    337     busy_threadnum = pool->busy_thr_num;
    338     pthread_mutex_unlock(&(pool->thread_counter));
    339     return busy_threadnum;
    340 }
    341 
    342 int is_thread_alive(pthread_t tid)
    343 {
    344     int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活
    345     if (kill_rc == ESRCH) {
    346         return false;
    347     }
    348 
    349     return true;
    350 }
    351 
    352 /*测试*/ 
    353 
    354 #if 1
    355 /* 线程池中的线程,模拟处理业务 */
    356 void *process(void *arg)
    357 {
    358     printf("thread 0x%x working on task %d
     ",(unsigned int)pthread_self(),(int)arg);
    359     sleep(1);  //小---大写
    360     printf("task %d is end
    ",(int)arg);
    361 
    362     return NULL;
    363 }
    364 
    365 int main(void)
    366 {
    367     /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
    368 
    369     threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/
    370     printf("pool inited");
    371 
    372     //int *num = (int *)malloc(sizeof(int)*20);
    373     int num[20], i;
    374     for (i = 0; i < 20; i++) {
    375         num[i]=i;
    376         printf("add task %d
    ",i);
    377         threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */
    378     }
    379     sleep(10);                                          /* 等子线程完成任务 */
    380     threadpool_destroy(thp);
    381 
    382     return 0;
    383 }
    384 
    385 #endif

    =======================

    线程池的相关信息:

    typedef struct {
    void *(*function)(void *); /* 函数指针,回调函数 */
    void *arg; /* 上面函数的参数 */
    } threadpool_task_t; /* 各子线程任务结构体 */


    /* 描述线程池相关信息 */
    struct threadpool_t {
    pthread_mutex_t lock; /* 用于锁住本结构体 */
    pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */

    pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
    pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */

    pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
    pthread_t adjust_tid; /* 存管理线程tid */
    threadpool_task_t *task_queue; /* 任务队列(数组首地址) */

    int min_thr_num; /* 线程池最小线程数 */
    int max_thr_num; /* 线程池最大线程数 */
    int live_thr_num; /* 当前存活线程个数 */
    int busy_thr_num; /* 忙状态线程个数 */
    int wait_exit_thr_num; /* 要销毁的线程个数 */

    int queue_front; /* task_queue队头下标 */
    int queue_rear; /* task_queue队尾下标 */
    int queue_size; /* task_queue队中实际任务数 */
    int queue_max_size; /* task_queue队列可容纳任务数上限 */

    int shutdown; /* 标志位,线程池使用状态,true或false */
    };

    ============================================

    查看这段代码的步骤及代码的相关逻辑步骤


    1. 大结构体, threadpool_task_t;结构体

    2. main

    threadpool_create 创建线程池。

    for产出任务

    threadpool_add 添加任务。

    销毁线程池。

    3. threadpool_create()

    4. threadpool_thread()

    跟踪到pthread_cond_wait();阻塞

    5. threadpool_add()

    跟踪到pthread_cond_signal(); 会到4步中 pthread_cond_wait()继续向后。

    6. adjust_thread()

    添加10个线程

    移除10个线程 ---pthread_exit();

    7. threadpool_destroy()

    销毁线程池。 ---pthread_exit();

  • 相关阅读:
    杭电 Problem
    杭电Problem 5053 the sum of cube 【数学公式】
    杭电 Problem 2089 不要62 【打表】
    杭电 Problem 4548 美素数【打表】
    杭电 Problem 2008 分拆素数和 【打表】
    杭电 Problem 1722 Cake 【gcd】
    杭电 Problem 2187 悼念512汶川大地震遇难同胞——老人是真饿了【贪心】
    杭电Problem 1872 稳定排序
    杭电 Problem 1753 大明A+B
    东北林业大 564 汉诺塔
  • 原文地址:https://www.cnblogs.com/yyx1-1/p/5907561.html
Copyright © 2011-2022 走看看