zoukankan      html  css  js  c++  java
  • C++线程池的实现

    线程池,简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行,下面是线程池的工作原理图:

    我们为什么要使用线程池呢?

    简单来说就是线程本身存在开销,我们利用多线程来进行任务处理,单线程也不能滥用,无止禁的开新线程会给系统产生大量消耗,而线程本来就是可重用的资源,不需要每次使用时都进行初始化,因此可以采用有限的线程个数处理无限的任务。

     

    废话少说,直接上代码

    首先是用条件变量和互斥量封装的一个状态,用于保护线程池的状态

    condition.h

     1 #ifndef _CONDITION_H_
     2 #define _CONDITION_H_
     3 
     4 #include <pthread.h>
     5 
     6 //封装一个互斥量和条件变量作为状态
     7 typedef struct condition
     8 {
     9     pthread_mutex_t pmutex;
    10     pthread_cond_t pcond;
    11 }condition_t;
    12 
    13 //对状态的操作函数
    14 int condition_init(condition_t *cond);
    15 int condition_lock(condition_t *cond);
    16 int condition_unlock(condition_t *cond);
    17 int condition_wait(condition_t *cond);
    18 int condition_timedwait(condition_t *cond, const struct timespec *abstime);
    19 int condition_signal(condition_t* cond);
    20 int condition_broadcast(condition_t *cond);
    21 int condition_destroy(condition_t *cond);
    22 
    23 #endif

    condition.c

     1 #include "condition.h"
     2 
     3 //初始化
     4 int condition_init(condition_t *cond)
     5 {
     6     int status;
     7     if((status = pthread_mutex_init(&cond->pmutex, NULL)))
     8         return status;
     9     
    10     if((status = pthread_cond_init(&cond->pcond, NULL)))
    11         return status;
    12     
    13     return 0;
    14 }
    15 
    16 //加锁
    17 int condition_lock(condition_t *cond)
    18 {
    19     return pthread_mutex_lock(&cond->pmutex);
    20 }
    21 
    22 //解锁
    23 int condition_unlock(condition_t *cond)
    24 {
    25     return pthread_mutex_unlock(&cond->pmutex);
    26 }
    27 
    28 //等待
    29 int condition_wait(condition_t *cond)
    30 {
    31     return pthread_cond_wait(&cond->pcond, &cond->pmutex);
    32 }
    33 
    34 //固定时间等待
    35 int condition_timedwait(condition_t *cond, const struct timespec *abstime)
    36 {
    37     return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
    38 }
    39 
    40 //唤醒一个睡眠线程
    41 int condition_signal(condition_t* cond)
    42 {
    43     return pthread_cond_signal(&cond->pcond);
    44 }
    45 
    46 //唤醒所有睡眠线程
    47 int condition_broadcast(condition_t *cond)
    48 {
    49     return pthread_cond_broadcast(&cond->pcond);
    50 }
    51 
    52 //释放
    53 int condition_destroy(condition_t *cond)
    54 {
    55     int status;
    56     if((status = pthread_mutex_destroy(&cond->pmutex)))
    57         return status;
    58     
    59     if((status = pthread_cond_destroy(&cond->pcond)))
    60         return status;
    61         
    62     return 0;
    63 }

    然后是线程池对应的threadpool.h和threadpool.c

     1 #ifndef _THREAD_POOL_H_
     2 #define _THREAD_POOL_H_
     3 
     4 //线程池头文件
     5 
     6 #include "condition.h"
     7 
     8 //封装线程池中的对象需要执行的任务对象
     9 typedef struct task
    10 {
    11     void *(*run)(void *args);  //函数指针,需要执行的任务
    12     void *arg;              //参数
    13     struct task *next;      //任务队列中下一个任务
    14 }task_t;
    15 
    16 
    17 //下面是线程池结构体
    18 typedef struct threadpool
    19 {
    20     condition_t ready;    //状态量
    21     task_t *first;       //任务队列中第一个任务
    22     task_t *last;        //任务队列中最后一个任务
    23     int counter;         //线程池中已有线程数
    24     int idle;            //线程池中kongxi线程数
    25     int max_threads;     //线程池最大线程数
    26     int quit;            //是否退出标志
    27 }threadpool_t;
    28 
    29 
    30 //线程池初始化
    31 void threadpool_init(threadpool_t *pool, int threads);
    32 
    33 //往线程池中加入任务
    34 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
    35 
    36 //摧毁线程池
    37 void threadpool_destroy(threadpool_t *pool);
    38 
    39 #endif
      1 #include "threadpool.h"
      2 #include <stdlib.h>
      3 #include <stdio.h>
      4 #include <string.h>
      5 #include <errno.h>
      6 #include <time.h>
      7 
      8 //创建的线程执行
      9 void *thread_routine(void *arg)
     10 {
     11     struct timespec abstime;
     12     int timeout;
     13     printf("thread %d is starting
    ", (int)pthread_self());
     14     threadpool_t *pool = (threadpool_t *)arg;
     15     while(1)
     16     {
     17         timeout = 0;
     18         //访问线程池之前需要加锁
     19         condition_lock(&pool->ready);
     20         //空闲
     21         pool->idle++;
     22         //等待队列有任务到来 或者 收到线程池销毁通知
     23         while(pool->first == NULL && !pool->quit)
     24         {
     25             //否则线程阻塞等待
     26             printf("thread %d is waiting
    ", (int)pthread_self());
     27             //获取从当前时间,并加上等待时间, 设置进程的超时睡眠时间
     28             clock_gettime(CLOCK_REALTIME, &abstime);  
     29             abstime.tv_sec += 2;
     30             int status;
     31             status = condition_timedwait(&pool->ready, &abstime);  //该函数会解锁,允许其他线程访问,当被唤醒时,加锁
     32             if(status == ETIMEDOUT)
     33             {
     34                 printf("thread %d wait timed out
    ", (int)pthread_self());
     35                 timeout = 1;
     36                 break;
     37             }
     38         }
     39         
     40         pool->idle--;
     41         if(pool->first != NULL)
     42         {
     43             //取出等待队列最前的任务,移除任务,并执行任务
     44             task_t *t = pool->first;
     45             pool->first = t->next;
     46             //由于任务执行需要消耗时间,先解锁让其他线程访问线程池
     47             condition_unlock(&pool->ready);
     48             //执行任务
     49             t->run(t->arg);
     50             //执行完任务释放内存
     51             free(t);
     52             //重新加锁
     53             condition_lock(&pool->ready);
     54         }
     55         
     56         //退出线程池
     57         if(pool->quit && pool->first == NULL)
     58         {
     59             pool->counter--;//当前工作的线程数-1
     60             //若线程池中没有线程,通知等待线程(主线程)全部任务已经完成
     61             if(pool->counter == 0)
     62             {
     63                 condition_signal(&pool->ready);
     64             }
     65             condition_unlock(&pool->ready);
     66             break;
     67         }
     68         //超时,跳出销毁线程
     69         if(timeout == 1)
     70         {
     71             pool->counter--;//当前工作的线程数-1
     72             condition_unlock(&pool->ready);
     73             break;
     74         }
     75         
     76         condition_unlock(&pool->ready);
     77     }
     78     
     79     printf("thread %d is exiting
    ", (int)pthread_self());
     80     return NULL;
     81     
     82 }
     83 
     84 
     85 //线程池初始化
     86 void threadpool_init(threadpool_t *pool, int threads)
     87 {
     88     
     89     condition_init(&pool->ready);
     90     pool->first = NULL;
     91     pool->last =NULL;
     92     pool->counter =0;
     93     pool->idle =0;
     94     pool->max_threads = threads;
     95     pool->quit =0;
     96     
     97 }
     98 
     99 
    100 //增加一个任务到线程池
    101 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
    102 {
    103     //产生一个新的任务
    104     task_t *newtask = (task_t *)malloc(sizeof(task_t));
    105     newtask->run = run;
    106     newtask->arg = arg;
    107     newtask->next=NULL;//新加的任务放在队列尾端
    108     
    109     //线程池的状态被多个线程共享,操作前需要加锁
    110     condition_lock(&pool->ready);
    111     
    112     if(pool->first == NULL)//第一个任务加入
    113     {
    114         pool->first = newtask;
    115     }        
    116     else    
    117     {
    118         pool->last->next = newtask;
    119     }
    120     pool->last = newtask;  //队列尾指向新加入的线程
    121     
    122     //线程池中有线程空闲,唤醒
    123     if(pool->idle > 0)
    124     {
    125         condition_signal(&pool->ready);
    126     }
    127     //当前线程池中线程个数没有达到设定的最大值,创建一个新的线性
    128     else if(pool->counter < pool->max_threads)
    129     {
    130         pthread_t tid;
    131         pthread_create(&tid, NULL, thread_routine, pool);
    132         pool->counter++;
    133     }
    134     //结束,访问
    135     condition_unlock(&pool->ready);
    136 }
    137 
    138 //线程池销毁
    139 void threadpool_destroy(threadpool_t *pool)
    140 {
    141     //如果已经调用销毁,直接返回
    142     if(pool->quit)
    143     {
    144     return;
    145     }
    146     //加锁
    147     condition_lock(&pool->ready);
    148     //设置销毁标记为1
    149     pool->quit = 1;
    150     //线程池中线程个数大于0
    151     if(pool->counter > 0)
    152     {
    153         //对于等待的线程,发送信号唤醒
    154         if(pool->idle > 0)
    155         {
    156             condition_broadcast(&pool->ready);
    157         }
    158         //正在执行任务的线程,等待他们结束任务
    159         while(pool->counter)
    160         {
    161             condition_wait(&pool->ready);
    162         }
    163     }
    164     condition_unlock(&pool->ready);
    165     condition_destroy(&pool->ready);
    166 }

    测试代码:

     1 #include "threadpool.h"
     2 #include <unistd.h>
     3 #include <stdlib.h>
     4 #include <stdio.h>
     5 
     6 void* mytask(void *arg)
     7 {
     8     printf("thread %d is working on task %d
    ", (int)pthread_self(), *(int*)arg);
     9     sleep(1);
    10     free(arg);
    11     return NULL;
    12 }
    13 
    14 //测试代码
    15 int main(void)
    16 {
    17     threadpool_t pool;
    18     //初始化线程池,最多三个线程
    19     threadpool_init(&pool, 3);
    20     int i;
    21     //创建十个任务
    22     for(i=0; i < 10; i++)
    23     {
    24         int *arg = malloc(sizeof(int));
    25         *arg = i;
    26         threadpool_add_task(&pool, mytask, arg);
    27         
    28     }
    29     threadpool_destroy(&pool);
    30     return 0;
    31 }

    输出结果:

    可以看出程序先后创建了三个线程进行工作,当没有任务空闲时,等待2s直接退出销毁线程

     

  • 相关阅读:
    Java 8 新特性之 Stream&forEach&map&filter&limit&sorted&统计函数&Collectors&并行(parallel)程序(转)
    kafka入门:简介、使用场景、设计原理、主要配置及集群搭建(转)
    java8 stream API
    Docker容器CPU、memory资源限制
    JVM调优总结 -Xms -Xmx -Xmn -Xss
    JAVA8之妙用Optional解决判断Null为空的问题
    【王凤鸣 太极缠丝功笔记】第二章
    【精美凉菜】黄瓜腐竹花生豆
    【瓜果篇】黄瓜
    【Teradata UDF】中文按字符查找chs_instr
  • 原文地址:https://www.cnblogs.com/ybqjymy/p/12204292.html
Copyright © 2011-2022 走看看