zoukankan      html  css  js  c++  java
  • Linux C编程之二十二 Linux线程池实现

     一、线程池实现原理

    1. 管理者线程

    (1)计算线程不够用

    • 创建线程

    (2) 空闲线程太多

         a. 销毁

    • 更新要销毁的线程个数
    • 通过条件变量完成的

         b. 如果空闲太多,任务不够

    • 线程阻塞在该条件变量上

         c. 发送信号

    • pthread_cond_signal

    2. 线程池中的线程

    (1)从任务队列中取数据

    • 任务队列任务
    • 执行任务

    (2)销毁空闲的线程

    • 让线程执行pthread_exit
    • 阻塞空闲的线程收到信号:

              解除阻塞
              只有一个往下执行
              在执行任务之前做了销毁操作 -- 自行退出

    二、线程池代码实现

    1. 初始化一些线程

    2. 需要有一个管理者线程

       a. 如果使用率超过一定的百分比

        创建线程: 按照一定的步长增长

       b. 空闲的线程增多

        销毁线程

        留下的比实际多一些

    3. 线程工作的时候

        处理数据的时候:

    • 互斥锁
    • 条件变量

    注意:

    a. 线程阻塞条件:

    任务对列如果为空 cond_empty
    pthread_cond_wait(&cond_empty, &mutex);

     b. 任务队列中有数据:

    激活阻塞在条件变量上的线程:
    pthread_cond_signal(&cond_empty);
    pthead_cond_broadcast(&cond_empty);

    最终代码:

      1 #include <stdlib.h>
      2 #include <pthread.h>
      3 #include <unistd.h>
      4 #include <assert.h>
      5 #include <stdio.h>
      6 #include <string.h>
      7 #include <signal.h>
      8 #include <errno.h>
      9 #include "threadpool.h"
     10 
     11 #define DEFAULT_TIME 10                 /*10s检测一次*/
     12 #define MIN_WAIT_TASK_NUM 10            /*如果queue_size > MIN_WAIT_TASK_NUM 添加新的线程到线程池*/ 
     13 #define DEFAULT_THREAD_VARY 10          /*每次创建和销毁线程的个数*/
     14 #define true 1
     15 #define false 0
     16 
     17 typedef struct {
     18     void *(*function)(void *);          /* 函数指针,回调函数 */
     19     void *arg;                          /* 上面函数的参数 */
     20 } threadpool_task_t;                    /* 各子线程任务结构体 */
     21 
     22 /* 描述线程池相关信息 */
     23 struct threadpool_t {
     24     pthread_mutex_t lock;               /* 用于锁住本结构体 */    
     25     pthread_mutex_t thread_counter;     /* 记录忙状态线程个数de琐 -- busy_thr_num */
     26     pthread_cond_t queue_not_full;      /* 当任务队列满时,添加任务的线程阻塞,等待此条件变量 */
     27     pthread_cond_t queue_not_empty;     /* 任务队列里不为空时,通知等待任务的线程 */
     28 
     29     pthread_t *threads;                 /* 存放线程池中每个线程的tid。数组 */
     30     pthread_t adjust_tid;               /* 存管理线程tid */
     31     threadpool_task_t *task_queue;      /* 任务队列 */
     32 
     33     int min_thr_num;                    /* 线程池最小线程数 */
     34     int max_thr_num;                    /* 线程池最大线程数 */
     35     int live_thr_num;                   /* 当前存活线程个数 */
     36     int busy_thr_num;                   /* 忙状态线程个数 */
     37     int wait_exit_thr_num;              /* 要销毁的线程个数 */
     38 
     39     int queue_front;                    /* task_queue队头下标 */
     40     int queue_rear;                     /* task_queue队尾下标 */
     41     int queue_size;                     /* task_queue队中实际任务数 */
     42     int queue_max_size;                 /* task_queue队列可容纳任务数上限 */
     43 
     44     int shutdown;                       /* 标志位,线程池使用状态,true或false */
     45 };
     46 
     47 /**
     48  * @function void *threadpool_thread(void *threadpool)
     49  * @desc the worker thread
     50  * @param threadpool the pool which own the thread
     51  */
     52 void *threadpool_thread(void *threadpool);
     53 
     54 /**
     55  * @function void *adjust_thread(void *threadpool);
     56  * @desc manager thread
     57  * @param threadpool the threadpool
     58  */
     59 void *adjust_thread(void *threadpool);
     60 
     61 /**
     62  * check a thread is alive
     63  */
     64 int is_thread_alive(pthread_t tid);
     65 int threadpool_free(threadpool_t *pool);
     66 
     67 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
     68 {
     69     int i;
     70     threadpool_t *pool = NULL;
     71     do {
     72         if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {  
     73             printf("malloc threadpool fail");
     74             break;/*跳出do while*/
     75         }
     76 
     77         pool->min_thr_num = min_thr_num;
     78         pool->max_thr_num = max_thr_num;
     79         pool->busy_thr_num = 0;
     80         pool->live_thr_num = min_thr_num;               /* 活着的线程数 初值=最小线程数 */
     81         pool->queue_size = 0;                           /* 有0个产品 */
     82         pool->queue_max_size = queue_max_size;
     83         pool->queue_front = 0;
     84         pool->queue_rear = 0;
     85         pool->shutdown = false;                         /* 不关闭线程池 */
     86 
     87         /* 根据最大线程上限数, 给工作线程数组开辟空间, 并清零 */
     88         pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num); 
     89         if (pool->threads == NULL) {
     90             printf("malloc threads fail");
     91             break;
     92         }
     93         memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
     94 
     95         /* 队列开辟空间 */
     96         pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
     97         if (pool->task_queue == NULL) {
     98             printf("malloc task_queue fail");
     99             break;
    100         }
    101 
    102         /* 初始化互斥琐、条件变量 */
    103         if (pthread_mutex_init(&(pool->lock), NULL) != 0
    104                 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0
    105                 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0
    106                 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0)
    107         {
    108             printf("init the lock or cond fail");
    109             break;
    110         }
    111 
    112         /* 启动 min_thr_num 个 work thread */
    113         for (i = 0; i < min_thr_num; i++) {
    114             pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);/*pool指向当前线程池*/
    115             printf("start thread 0x%x...
    ", (unsigned int)pool->threads[i]);
    116         }
    117         pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *)pool);/* 启动管理者线程 */
    118 
    119         return pool;
    120 
    121     } while (0);
    122 
    123     threadpool_free(pool);      /* 前面代码调用失败时,释放poll存储空间 */
    124 
    125     return NULL;
    126 }
    127 
    128 /* 向线程池中 添加一个任务 */
    129 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg)
    130 {
    131     pthread_mutex_lock(&(pool->lock));
    132 
    133     /* ==为真,队列已经满, 调wait阻塞 */
    134     while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) {
    135         pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
    136     }
    137     if (pool->shutdown) {
    138         pthread_mutex_unlock(&(pool->lock));
    139     }
    140 
    141     /* 清空 工作线程 调用的回调函数 的参数arg */
    142     if (pool->task_queue[pool->queue_rear].arg != NULL) {
    143         free(pool->task_queue[pool->queue_rear].arg);
    144         pool->task_queue[pool->queue_rear].arg = NULL;
    145     }
    146     /*添加任务到任务队列里*/
    147     pool->task_queue[pool->queue_rear].function = function;
    148     pool->task_queue[pool->queue_rear].arg = arg;
    149     pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;       /* 队尾指针移动, 模拟环形 */
    150     pool->queue_size++;
    151 
    152     /*添加完任务后,队列不为空,唤醒线程池中 等待处理任务的线程*/
    153     pthread_cond_signal(&(pool->queue_not_empty));
    154     pthread_mutex_unlock(&(pool->lock));
    155 
    156     return 0;
    157 }
    158 
    159 /* 线程池中各个工作线程 */
    160 void *threadpool_thread(void *threadpool)
    161 {
    162     threadpool_t *pool = (threadpool_t *)threadpool;
    163     threadpool_task_t task;
    164 
    165     while (true) {
    166         /* Lock must be taken to wait on conditional variable */
    167         /*刚创建出线程,等待任务队列里有任务,否则阻塞等待任务队列里有任务后再唤醒接收任务*/
    168         pthread_mutex_lock(&(pool->lock));
    169 
    170         /*queue_size == 0 说明没有任务,调 wait 阻塞在条件变量上, 若有任务,跳过该while*/
    171         while ((pool->queue_size == 0) && (!pool->shutdown)) {  
    172             printf("thread 0x%x is waiting
    ", (unsigned int)pthread_self());
    173             pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
    174 
    175             /*清除指定数目的空闲线程,如果要结束的线程个数大于0,结束线程*/
    176             if (pool->wait_exit_thr_num > 0) {
    177                 pool->wait_exit_thr_num--;
    178 
    179                 /*如果线程池里线程个数大于最小值时可以结束当前线程*/
    180                 if (pool->live_thr_num > pool->min_thr_num) {
    181                     printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    182                     pool->live_thr_num--;
    183                     pthread_mutex_unlock(&(pool->lock));
    184                     pthread_exit(NULL);
    185                 }
    186             }
    187         }
    188 
    189         /*如果指定了true,要关闭线程池里的每个线程,自行退出处理*/
    190         if (pool->shutdown) {
    191             pthread_mutex_unlock(&(pool->lock));
    192             printf("thread 0x%x is exiting
    ", (unsigned int)pthread_self());
    193             pthread_exit(NULL);     /* 线程自行结束 */
    194         }
    195 
    196         /*从任务队列里获取任务, 是一个出队操作*/
    197         task.function = pool->task_queue[pool->queue_front].function;
    198         task.arg = pool->task_queue[pool->queue_front].arg;
    199 
    200         pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;       /* 出队,模拟环形队列 */
    201         pool->queue_size--;
    202 
    203         /*通知可以有新的任务添加进来*/
    204         pthread_cond_broadcast(&(pool->queue_not_full));
    205 
    206         /*任务取出后,立即将 线程池琐 释放*/
    207         pthread_mutex_unlock(&(pool->lock));
    208 
    209         /*执行任务*/ 
    210         printf("thread 0x%x start working
    ", (unsigned int)pthread_self());
    211         pthread_mutex_lock(&(pool->thread_counter));                            /*忙状态线程数变量琐*/
    212         pool->busy_thr_num++;                                                   /*忙状态线程数+1*/
    213         pthread_mutex_unlock(&(pool->thread_counter));
    214         (*(task.function))(task.arg);                                           /*执行回调函数任务*/
    215         //task.function(task.arg);                                              /*执行回调函数任务*/
    216 
    217         /*任务结束处理*/ 
    218         printf("thread 0x%x end working
    ", (unsigned int)pthread_self());
    219         pthread_mutex_lock(&(pool->thread_counter));
    220         pool->busy_thr_num--;                                       /*处理掉一个任务,忙状态数线程数-1*/
    221         pthread_mutex_unlock(&(pool->thread_counter));
    222     }
    223 
    224     pthread_exit(NULL);
    225 }
    226 
    227 /* 管理线程 */
    228 void *adjust_thread(void *threadpool)
    229 {
    230     int i;
    231     threadpool_t *pool = (threadpool_t *)threadpool;
    232     while (!pool->shutdown) {
    233 
    234         sleep(DEFAULT_TIME);                                    /*定时 对线程池管理*/
    235 
    236         pthread_mutex_lock(&(pool->lock));
    237         int queue_size = pool->queue_size;                      /* 关注 任务数 */
    238         int live_thr_num = pool->live_thr_num;                  /* 存活 线程数 */
    239         pthread_mutex_unlock(&(pool->lock));
    240 
    241         pthread_mutex_lock(&(pool->thread_counter));
    242         int busy_thr_num = pool->busy_thr_num;                  /* 忙着的线程数 */
    243         pthread_mutex_unlock(&(pool->thread_counter));
    244 
    245         /* 创建新线程 算法: 任务数大于最小线程池个数, 且存活的线程数少于最大线程个数时 如:30>=10 && 40<100*/
    246         if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) {
    247             pthread_mutex_lock(&(pool->lock));  
    248             int add = 0;
    249 
    250             /*一次增加 DEFAULT_THREAD 个线程*/
    251             for (i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY
    252                     && pool->live_thr_num < pool->max_thr_num; i++) {
    253                 if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i])) {
    254                     pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
    255                     add++;
    256                     pool->live_thr_num++;
    257                 }
    258             }
    259 
    260             pthread_mutex_unlock(&(pool->lock));
    261         }
    262 
    263         /* 销毁多余的空闲线程 算法:忙线程X2 小于 存活的线程数 且 存活的线程数 大于 最小线程数时*/
    264         if ((busy_thr_num * 2) < live_thr_num  &&  live_thr_num > pool->min_thr_num) {
    265 
    266             /* 一次销毁DEFAULT_THREAD个线程, 隨機10個即可 */
    267             pthread_mutex_lock(&(pool->lock));
    268             pool->wait_exit_thr_num = DEFAULT_THREAD_VARY;      /* 要销毁的线程数 设置为10 */
    269             pthread_mutex_unlock(&(pool->lock));
    270 
    271             for (i = 0; i < DEFAULT_THREAD_VARY; i++) {
    272                 /* 通知处在空闲状态的线程, 他们会自行终止*/
    273                 pthread_cond_signal(&(pool->queue_not_empty));
    274             }
    275         }
    276     }
    277 
    278     return NULL;
    279 }
    280 
    281 int threadpool_destroy(threadpool_t *pool)
    282 {
    283     int i;
    284     if (pool == NULL) {
    285         return -1;
    286     }
    287     pool->shutdown = true;
    288 
    289     /*先销毁管理线程*/
    290     pthread_join(pool->adjust_tid, NULL);
    291 
    292     for (i = 0; i < pool->live_thr_num; i++) {
    293         /*通知所有的空闲线程*/
    294         pthread_cond_broadcast(&(pool->queue_not_empty));
    295     }
    296     for (i = 0; i < pool->live_thr_num; i++) {
    297         pthread_join(pool->threads[i], NULL);
    298     }
    299     threadpool_free(pool);
    300 
    301     return 0;
    302 }
    303 
    304 int threadpool_free(threadpool_t *pool)
    305 {
    306     if (pool == NULL) {
    307         return -1;
    308     }
    309 
    310     if (pool->task_queue) {
    311         free(pool->task_queue);
    312     }
    313     if (pool->threads) {
    314         free(pool->threads);
    315         pthread_mutex_lock(&(pool->lock));
    316         pthread_mutex_destroy(&(pool->lock));
    317         pthread_mutex_lock(&(pool->thread_counter));
    318         pthread_mutex_destroy(&(pool->thread_counter));
    319         pthread_cond_destroy(&(pool->queue_not_empty));
    320         pthread_cond_destroy(&(pool->queue_not_full));
    321     }
    322     free(pool);
    323     pool = NULL;
    324 
    325     return 0;
    326 }
    327 
    328 int threadpool_all_threadnum(threadpool_t *pool)
    329 {
    330     int all_threadnum = -1;
    331     pthread_mutex_lock(&(pool->lock));
    332     all_threadnum = pool->live_thr_num;
    333     pthread_mutex_unlock(&(pool->lock));
    334     return all_threadnum;
    335 }
    336 
    337 int threadpool_busy_threadnum(threadpool_t *pool)
    338 {
    339     int busy_threadnum = -1;
    340     pthread_mutex_lock(&(pool->thread_counter));
    341     busy_threadnum = pool->busy_thr_num;
    342     pthread_mutex_unlock(&(pool->thread_counter));
    343     return busy_threadnum;
    344 }
    345 
    346 int is_thread_alive(pthread_t tid)
    347 {
    348     int kill_rc = pthread_kill(tid, 0);     //发0号信号,测试线程是否存活
    349     if (kill_rc == ESRCH) {
    350         return false;
    351     }
    352 
    353     return true;
    354 }
    355 
    356 /*测试*/ 
    357 
    358 #if 1
    359 /* 线程池中的线程,模拟处理业务 */
    360 void *process(void *arg)
    361 {
    362     printf("thread 0x%x working on task %d
     ",(unsigned int)pthread_self(),*(int *)arg);
    363     sleep(1);
    364     printf("task %d is end
    ",*(int *)arg);
    365 
    366     return NULL;
    367 }
    368 int main(void)
    369 {
    370     /*threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);*/
    371 
    372     threadpool_t *thp = threadpool_create(3,100,100);/*创建线程池,池里最小3个线程,最大100,队列最大100*/
    373     printf("pool inited");
    374 
    375     //int *num = (int *)malloc(sizeof(int)*20);
    376     int num[20], i;
    377     for (i = 0; i < 20; i++) {
    378         num[i]=i;
    379         printf("add task %d
    ",i);
    380         threadpool_add(thp, process, (void*)&num[i]);     /* 向线程池中添加任务 */
    381     }
    382     sleep(10);                                          /* 等子线程完成任务 */
    383     threadpool_destroy(thp);
    384 
    385     return 0;
    386 }
    387 
    388 #endif
    threadpool.c
     1 #ifndef __THREADPOOL_H_
     2 #define __THREADPOOL_H_
     3 
     4 typedef struct threadpool_t threadpool_t;
     5 
     6 /**
     7  * @function threadpool_create
     8  * @descCreates a threadpool_t object.
     9  * @param thr_num  thread num
    10  * @param max_thr_num  max thread size
    11  * @param queue_max_size   size of the queue.
    12  * @return a newly created thread pool or NULL
    13  */
    14 threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size);
    15 
    16 /**
    17  * @function threadpool_add
    18  * @desc add a new task in the queue of a thread pool
    19  * @param pool     Thread pool to which add the task.
    20  * @param function Pointer to the function that will perform the task.
    21  * @param argument Argument to be passed to the function.
    22  * @return 0 if all goes well,else -1
    23  */
    24 int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg);
    25 
    26 /**
    27  * @function threadpool_destroy
    28  * @desc Stops and destroys a thread pool.
    29  * @param pool  Thread pool to destroy.
    30  * @return 0 if destory success else -1
    31  */
    32 int threadpool_destroy(threadpool_t *pool);
    33 
    34 /**
    35  * @desc get the thread num
    36  * @pool pool threadpool
    37  * @return # of the thread
    38  */
    39 int threadpool_all_threadnum(threadpool_t *pool);
    40 
    41 /**
    42  * desc get the busy thread num
    43  * @param pool threadpool
    44  * return # of the busy thread
    45  */
    46 int threadpool_busy_threadnum(threadpool_t *pool);
    47 
    48 #endif
    threadpool.h
     1 src = $(wildcard *.c)
     2 targets = $(patsubst %.c, %, $(src))
     3 
     4 CC = gcc
     5 CFLAGS = -lpthread -Wall -g 
     6 
     7 all:$(targets)
     8 
     9 $(targets):%:%.c
    10     $(CC) $< -o $@ $(CFLAGS)
    11 
    12 .PHONY:clean all
    13 clean:
    14     -rm -rf $(targets)
    makefile
  • 相关阅读:
    perl 传递对象到模块
    mysql 监控 大批量的插入,删除,和修改
    mysql 监控 大批量的插入,删除,和修改
    【带着canvas去流浪(11)】Three.js入门学习笔记
    当代职场成功学:越懒惰,越躺赢
    Python3 threading的多线程管理中的线程管理与锁
    collections 使用教程
    Spring MVC DispatcherServlet改造为 CSE RestServlet 常见问题汇编
    WAF(NGINX)中502和504的区别
    Lua
  • 原文地址:https://www.cnblogs.com/xuejiale/p/10919824.html
Copyright © 2011-2022 走看看