zoukankan      html  css  js  c++  java
  • C实现线程池

    第一部分为头文件

     1 #ifndef __THREADPOOL_H_
     2 #define __THREADPOOL_H_
     3 
     4 typedef struct threadpool_t threadpool_t;
     5 
     6 /**
     7  * @function threadpool_create
     8  * @descCreates a threadpool_t object.
     9  * @param thr_num  thread num
    10  * @param max_thr_num  max thread size
    11  * @param queue_max_size   size of the queue.
    12  * @return a newly created thread pool or NULL
    13  */
    14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    15 
    16 /**
    17  * @function threadpool_add
    18  * @desc add a new task in the queue of a thread pool
    19  * @param pool     Thread pool to which add the task.
    20  * @param function Pointer to the function that will perform the task.
    21  * @param argument Argument to be passed to the function.
    22  * @return 0 if all goes well,else -1
    23  */
    24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
    25 
    26 /**
    27  * @function threadpool_destroy
    28  * @desc Stops and destroys a thread pool.
    29  * @param pool  Thread pool to destroy.
    30  * @return 0 if destory success else -1
    31  */
    32 int threadpool_destroy(threadpool_t *pool);
    33 
    34 /**
    35  * @desc get the thread num
    36  * @pool pool threadpool
    37  * @return # of the thread
    38  */
    39 int threadpool_all_threadnum(threadpool_t *pool);
    40 
    41 /**
    42  * desc get the busy thread num
    43  * @param pool threadpool
    44  * return # of the busy thread
    45  */
    46 int threadpool_busy_threadnum(threadpool_t *pool);
    47 
    48 #endif

    第二部分为自实现线程池代码(对libevent库进行一些精简,凸显逻辑)

      1 #include <stdlib.h>
      2 #include <pthread.h>
      3 #include <unistd.h>
      4 #include <assert.h>
      5 #include <stdio.h>
      6 #include <string.h>
      7 #include <signal.h>
      8 #include <errno.h>
      9 #include "threadpool.h"
     10 
     11 #define DEFAULT_TIME 10                 /*10s检测一次*/
     12 #define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
     13 #define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
     14 #define true 1
     15 #define false 0
     16 
     17 typedef struct {
     18     void *(*function)(void *);          /* 函数指针,回调函数 */
     19     void *arg;                          /* 上面函数的参数 */
     20 } threadpool_task_t;                    /* 各子线程任务结构体 */
     21 
     22 /* 描述线程池相关信息 */
     23 struct threadpool_t {
     24     pthread_mutex_t lock;               /* 用于锁住本结构体 */    
     25     pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */
     26 
     27     pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
     28     pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */
     29 
     30     pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
     31     pthread_t adjust_tid;               /* 存管理线程tid */
     32     threadpool_task_t *task_queue;      /* 任务队列(数组首地址) */
     33 
     34     int min_thr_num;                    /* 线程池最小线程数 */
     35     int max_thr_num;                    /* 线程池最大线程数 */
     36     int live_thr_num;                   /* 当前存活线程个数 */
     37     int busy_thr_num;                   /* 忙状态线程个数 */
     38     int wait_exit_thr_num;              /* 要销毁的线程个数 */
     39 
     40     int queue_front;                    /* task_queue队头下标 */
     41     int queue_rear;                     /* task_queue队尾下标 */
     42     int queue_size;                     /* task_queue队中实际任务数 */
     43     int queue_max_size;                 /* task_queue队列可容纳任务数上限 */
     44 
     45     int shutdown;                       /* 标志位,线程池使用状态,true或false */
     46 };
     47 
     48 void *threadpool_thread(void *threadpool);
     49 
     50 void *adjust_thread(void *threadpool);
     51 
     52 int is_thread_alive(pthread_t tid);
     53 int threadpool_free(threadpool_t *pool);
     54 
     55 //threadpool_create(3,100,100);  
     56 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
     57 {
     58     int i;
     59     threadpool_t *pool = NULL;
     60     do {
     61         if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
     62             printf("malloc threadpool fail");
     63             break;                                      /*跳出do while*/
     64         }
     65 
     66         pool->min_thr_num = min_thr_num;
     67         pool->max_thr_num = max_thr_num;
     68         pool->busy_thr_num = 0;
     69         pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
     70         pool->wait_exit_thr_num = 0;
     71         pool->queue_size = 0;                           /* 有0个产品 */
     72         pool->queue_max_size = queue_max_size;
     73         pool->queue_front = 0;
     74         pool->queue_rear = 0;
     75         pool->shutdown = false;                         /* 不关闭线程池 */
     76 
     77         /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
     78         pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
     79         if (pool->threads == NULL) {
     80             printf("malloc threads fail");
     81             break;
     82         }
     83         memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
     84 
     85         /* 队列开辟空间 */
     86         pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
     87         if (pool->task_queue == NULL) {
     88             printf("malloc task_queue fail");
     89             break;
     90         }
     91 
     92         /* 初始化互斥琐、条件变量 */
     93         if (pthread_mutex_init(&(pool->lock), NULL) != 0
     94                 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
     95                 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
     96                 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
     97         {
     98             printf("init the lock or cond fail");
     99             break;
    100         }
    101 
    102         /* 启动 min_thr_num 个 work thread */
    103         for (i = 0; i < min_thr_num; i++) {
    104             pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
    105             printf("start thread 0x%x...
    ", (unsigned int)pool->threads[i]);
    106         }
    107         pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
    108 
    109         return pool;
    110 
    111     } while (0);
    112 
    113     threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */
    114 
    115     return NULL;
    116 }
    117 
    118 /* 向线程池中 添加一个任务 */
    119 //threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 process: 小写---->大写*/
    120 
    121 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    122 {
    123     pthread_mutex_lock(&(pool->lock));
    124 
    125     /* ==为真,队列已经满, 调wait阻塞 */
    126     while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
    127         pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    128     }
    129     if (pool->shutdown) {
    130         pthread_cond_broadcast(&(pool->queue_not_empty));
    131         pthread_mutex_unlock(&(pool->lock));
    132         return 0;
    133     }
    134 
    135     /* 清空 工作线程 调用的回调函数 的参数arg */
    136     if (pool->task_queue[pool->queue_rear].arg != NULL) {
    137         pool->task_queue[pool->queue_rear].arg = NULL;
    138     }
    139     /*添加任务到任务队列里*/
    140     pool->task_queue[pool->queue_rear].function = function;
    141     pool->task_queue[pool->queue_rear].arg = arg;
    142     pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */
    143     pool->queue_size++;
    144 
    145     /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
    146     pthread_cond_signal(&(pool->queue_not_empty));
    147     pthread_mutex_unlock(&(pool->lock));
    148 
    149     return 0;
    150 }
    151 
    152 /* 线程池中各个工作线程 */
    153 void *threadpool_thread(void *threadpool)
    154 {
    155     threadpool_t *pool = (threadpool_t *)threadpool;
    156     threadpool_task_t task;
    157 
    158     while (true) {
    159         /* Lock must be taken to wait on conditional variable */
    160         /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
    161         pthread_mutex_lock(&(pool->lock));
    162 
    163         /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
    164         while ((pool->queue_size == 0) && (!pool->shutdown)) {  
    165             printf("thread 0x%x is waiting
    ", (unsigned int)pthread_self());
    166             pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    167 
    168             /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
    169             if (pool->wait_exit_thr_num > 0) {
    170                 pool->wait_exit_thr_num--;
    171 
    172                 /*如果线程池里线程个数大于最小值时可以结束当前线程*/
    173                 if (pool->live_thr_num > pool->min_thr_num) {
    174                     printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    175                     pool->live_thr_num--;
    176                     pthread_mutex_unlock(&(pool->lock));
    177 
    178                     pthread_exit(NULL);
    179                 }
    180             }
    181         }
    182 
    183         /*如果指定了true,要关闭线程池里的每个线程,自行退出处理---销毁线程池*/
    184         if (pool->shutdown) {
    185             pthread_mutex_unlock(&(pool->lock));
    186             printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    187             pthread_detach(pthread_self());
    188             pthread_exit(NULL);     /* 线程自行结束 */
    189         }
    190 
    191         /*从任务队列里获取任务, 是一个出队操作*/
    192         task.function = pool->task_queue[pool->queue_front].function;
    193         task.arg = pool->task_queue[pool->queue_front].arg;
    194 
    195         pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
    196         pool->queue_size--;
    197 
    198         /*通知可以有新的任务添加进来*/
    199         pthread_cond_broadcast(&(pool->queue_not_full));
    200 
    201         /*任务取出后,立即将 线程池琐 释放*/
    202         pthread_mutex_unlock(&(pool->lock));
    203 
    204         /*执行任务*/ 
    205         printf("thread 0x%x start working
    ", (unsigned int)pthread_self());
    206         pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
    207         pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
    208         pthread_mutex_unlock(&(pool->thread_counter));
    209 
    210         (*(task.function))(task.arg);                                           /*执行回调函数任务*/
    211         //task.function(task.arg);                                              /*执行回调函数任务*/
    212 
    213         /*任务结束处理*/ 
    214         printf("thread 0x%x end working
    ", (unsigned int)pthread_self());
    215         pthread_mutex_lock(&(pool->thread_counter));
    216         pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
    217         pthread_mutex_unlock(&(pool->thread_counter));
    218     }
    219 
    220     pthread_exit(NULL);
    221 }
    222 
    223 /* 管理线程 */
    224 void *adjust_thread(void *threadpool)
    225 {
    226     int i;
    227     threadpool_t *pool = (threadpool_t *)threadpool;
    228     while (!pool->shutdown) {
    229 
    230         sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/
    231 
    232         pthread_mutex_lock(&(pool->lock));
    233         int queue_size = pool->queue_size;                      /* 关注 任务数 */
    234         int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
    235         pthread_mutex_unlock(&(pool->lock));
    236 
    237         pthread_mutex_lock(&(pool->thread_counter));
    238         int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
    239         pthread_mutex_unlock(&(pool->thread_counter));
    240 
    241         /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
    242         if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
    243             pthread_mutex_lock(&(pool->lock));  
    244             int add = 0;
    245 
    246             /*一次增加 DEFAULT_THREAD 个线程*/
    247             for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
    248                     && pool->live_thr_num < pool->max_thr_num; i++) {
    249                 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
    250                     pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    251                     add++;
    252                     pool->live_thr_num++;
    253                 }
    254             }
    255 
    256             pthread_mutex_unlock(&(pool->lock));
    257         }
    258 
    259         /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
    260         if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {
    261 
    262             /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
    263             pthread_mutex_lock(&(pool->lock));
    264             pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
    265             pthread_mutex_unlock(&(pool->lock));
    266 
    267             for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
    268                 /* 通知处在空闲状态的线程, 他们会自行终止*/
    269                 pthread_cond_signal(&(pool->queue_not_empty));
    270             }
    271         }
    272     }
    273 
    274     return NULL;
    275 }
    276 
    277 int threadpool_destroy(threadpool_t *pool)
    278 {
    279     int i;
    280     if (pool == NULL) {
    281         return -1;
    282     }
    283     pool->shutdown = true;
    284 
    285     /*先销毁管理线程*/
    286     pthread_join(pool->adjust_tid, NULL);
    287 
    288     for (i = 0; i < pool->live_thr_num; i++) {
    289         /*通知所有的空闲线程*/
    290         pthread_cond_broadcast(&(pool->queue_not_empty));
    291     }
    292     for (i = 0; i < pool->live_thr_num; i++) {
    293         pthread_join(pool->threads[i], NULL);
    294     }
    295     threadpool_free(pool);
    296 
    297     return 0;
    298 }
    299 
    300 int threadpool_free(threadpool_t *pool)
    301 {
    302     if (pool == NULL) {
    303         return -1;
    304     }
    305 
    306     if (pool->task_queue) {
    307         free(pool->task_queue);
    308     }
    309     if (pool->threads) {
    310         free(pool->threads);
    311         pthread_mutex_lock(&(pool->lock));
    312         pthread_mutex_destroy(&(pool->lock));
    313         pthread_mutex_lock(&(pool->thread_counter));
    314         pthread_mutex_destroy(&(pool->thread_counter));
    315         pthread_cond_destroy(&(pool->queue_not_empty));
    316         pthread_cond_destroy(&(pool->queue_not_full));
    317     }
    318     free(pool);
    319     pool = NULL;
    320 
    321     return 0;
    322 }
    323 
    324 int threadpool_all_threadnum(threadpool_t *pool)
    325 {
    326     int all_threadnum = -1;
    327     pthread_mutex_lock(&(pool->lock));
    328     all_threadnum = pool->live_thr_num;
    329     pthread_mutex_unlock(&(pool->lock));
    330     return all_threadnum;
    331 }
    332 
    333 int threadpool_busy_threadnum(threadpool_t *pool)
    334 {
    335     int busy_threadnum = -1;
    336     pthread_mutex_lock(&(pool->thread_counter));
    337     busy_threadnum = pool->busy_thr_num;
    338     pthread_mutex_unlock(&(pool->thread_counter));
    339     return busy_threadnum;
    340 }
    341 
    342 int is_thread_alive(pthread_t tid)
    343 {
    344     int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活
    345     if (kill_rc == ESRCH) {
    346         return false;
    347     }
    348 
    349     return true;
    350 }
    351 
    352 /*测试*/ 
    353 
    354 #if 1
    355 /* 线程池中的线程,模拟处理业务 */
    356 void *process(void *arg)
    357 {
    358     printf("thread 0x%x working on task %d
     ",(unsigned int)pthread_self(),(int)arg);
    359     sleep(1);  //小---大写
    360     printf("task %d is end
    ",(int)arg);
    361 
    362     return NULL;
    363 }
    364 
    365 int main(void)
    366 {
    367     /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
    368 
    369     threadpool_t *thp = threadpool_create(3,100,100);   /*创建线程池,池里最小3个线程,最大100,队列最大100*/
    370     printf("pool inited");
    371 
    372     //int *num = (int *)malloc(sizeof(int)*20);
    373     int num[20], i;
    374     for (i = 0; i < 20; i++) {
    375         num[i]=i;
    376         printf("add task %d
    ",i);
    377         threadpool_add(thp, process, (void*)&num[i]);   /* 向线程池中添加任务 */
    378     }
    379     sleep(10);                                          /* 等子线程完成任务 */
    380     threadpool_destroy(thp);
    381 
    382     return 0;
    383 }
    384 
    385 #endif

    =======================

    线程池的相关信息:

    typedef struct {
    void *(*function)(void *); /* 函数指针,回调函数 */
    void *arg; /* 上面函数的参数 */
    } threadpool_task_t; /* 各子线程任务结构体 */


    /* 描述线程池相关信息 */
    struct threadpool_t {
    pthread_mutex_t lock; /* 用于锁住本结构体 */
    pthread_mutex_t thread_counter; /* 记录忙状态线程个数de琐 -- busy_thr_num */

    pthread_cond_t queue_not_full; /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
    pthread_cond_t queue_not_empty; /* 任务队列里不为空时,通知等待任务的线程 */

    pthread_t *threads; /* 存放线程池中每个线程的tid。数组 */
    pthread_t adjust_tid; /* 存管理线程tid */
    threadpool_task_t *task_queue; /* 任务队列(数组首地址) */

    int min_thr_num; /* 线程池最小线程数 */
    int max_thr_num; /* 线程池最大线程数 */
    int live_thr_num; /* 当前存活线程个数 */
    int busy_thr_num; /* 忙状态线程个数 */
    int wait_exit_thr_num; /* 要销毁的线程个数 */

    int queue_front; /* task_queue队头下标 */
    int queue_rear; /* task_queue队尾下标 */
    int queue_size; /* task_queue队中实际任务数 */
    int queue_max_size; /* task_queue队列可容纳任务数上限 */

    int shutdown; /* 标志位,线程池使用状态,true或false */
    };

    ============================================

    查看这段代码的步骤及代码的相关逻辑步骤


    1. 大结构体, threadpool_task_t;结构体

    2. main

    threadpool_create 创建线程池。

    for产出任务

    threadpool_add 添加任务。

    销毁线程池。

    3. threadpool_create()

    4. threadpool_thread()

    跟踪到pthread_cond_wait();阻塞

    5. threadpool_add()

    跟踪到pthread_cond_signal(); 会到4步中 pthread_cond_wait()继续向后。

    6. adjust_thread()

    添加10个线程

    移除10个线程 ---pthread_exit();

    7. threadpool_destroy()

    销毁线程池。 ---pthread_exit();

  • 相关阅读:
    安装、升级pip,但是python -m pip install --upgrade pip报错
    架构即未来阅读笔记3
    第十二周学习总结
    《大型网站技术架构:核心原理与案分析》阅读笔记02
    2021寒假(12)
    2021寒假(10)
    Spark简介
    《大型网站技术架构:核心原理与案分析》阅读笔记01
    2021寒假(9)
    2021寒假(8)
  • 原文地址:https://www.cnblogs.com/yyx1-1/p/5907561.html
Copyright © 2011-2022 走看看