zoukankan      html  css  js  c++  java
  • 线程池技术优化

    线程池:

      线程池是一种多线程处理形式,初始创建多个线程,初始线程处于wait状态。处理过程中将任务添加到队列中,按照队列顺序依次处理,此时线程处于work状态自动启动这些任务。线程任务处理完后继续处理队列中待执行任务,最后完成所有任务放回至线程池统一销毁。线程池线程都是后台线程,适用于连续产生大量并发任务的场合。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

    线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待监督管理者分配可并发执行的任务。线程池不仅避免了在处理短时间任务时创建与销毁线程的代价,还能够保证内核的充分利用,防止过分调度。

    1、初始化线程池:创建初始的任务链表(队列)及N个线程,初始化条件变量;

    2、任务链表管理:任务存放至链表中,记录当前待处理任务量,并存放未处理的任务,为线程池提供一种缓冲机制;

      当任务链表头为空,则表示无待处理任务,可等待或销毁线程池;当任务链表头指向非空,线程执行待任务,任务链表头下移,等待任务-1。

    3、线程池管理器(ThreadPoolManager:监测并管理线程池中线程状态,运用阻塞IO的方式

      线程状态管理:1)wait -- 线程池处于非关闭,当前无任务,线程阻塞;

             2)work -- 线程唤醒,执行待处理任务(process),任务链表头下移,等待任务-1;

    4、线程池销毁:当任务全部完成或接收到销毁指令,销毁等待链表及所有线程,销毁后指针置空。

      1 #include <stdio.h> 
      2 #include <stdlib.h> 
      3 #include <unistd.h> 
      4 #include <sys/types.h> 
      5 #include <pthread.h> 
      6 
      7 
      8 typedef struct task 
      9 { 
     10     void *(*process) (void *arg); 
     11     void *arg;
     12     struct task *next; 
     13 } Cthread_task; 
     14 
     15 
     16 /*线程池结构*/ 
     17 typedef struct 
     18 { 
     19     pthread_mutex_t queue_lock; 
     20     pthread_cond_t queue_ready; 
     21 
     22     /*链表结构,线程池中所有等待任务7*/ 
     23     Cthread_task *queue_head; 
     24 
     25     /*是否销毁线程池*/ 
     26     int shutdown; 
     27     pthread_t *threadid; 
     28     
     29     /*线程池中线程数目3*/ 
     30     int max_thread_num; 
     31     
     32     /*当前等待的任务数*/ 
     33     int cur_task_size; 
     34 
     35 } Cthread_pool; 
     36 
     37 static Cthread_pool *pool = NULL; 
     38 
     39 void *thread_routine (void *arg); 
     40 
     41 void pool_init (int max_thread_num) 
     42 { 
     43     int i = 0;
     44     
     45     pool = (Cthread_pool *) malloc (sizeof (Cthread_pool)); 
     46 
     47     pthread_mutex_init (&(pool->queue_lock), NULL); 
     48     /*初始化条件变量*/
     49     pthread_cond_init (&(pool->queue_ready), NULL); 
     50 
     51     pool->queue_head = NULL; //等待任务链表为空
     52 
     53     pool->max_thread_num = max_thread_num; 
     54     pool->cur_task_size = 0; 
     55 
     56     pool->shutdown = 0; 
     57 
     58     pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); 
     59 
     60     for (i = 0; i < max_thread_num; i++) //3
     61     {  
     62         pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL); //创建线程
     63     } 
     64 } 
     65 
     66 
     67 
     68 /*向线程池中加入任务*/ 
     69 int pool_add_task (void *(*process) (void *arg), void *arg) 
     70 { 
     71     /*构造一个新任务 初始化*/ 
     72     Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task)); 
     73     task->process = process; 
     74     task->arg = arg; 
     75     task->next = NULL;
     76 
     77     pthread_mutex_lock (&(pool->queue_lock)); 
     78     /*将任务加入到等待队列中 加到链表尾部或空成员中*/ 
     79     Cthread_task *member = pool->queue_head; 
     80     if (member != NULL) 
     81     { 
     82         while (member->next != NULL) 
     83             member = member->next; 
     84         member->next = task; 
     85     } 
     86     else 
     87     { 
     88         pool->queue_head = task; 
     89     } 
     90 
     91     pool->cur_task_size++; 
     92     pthread_mutex_unlock (&(pool->queue_lock)); 
     93     
     94     pthread_cond_signal (&(pool->queue_ready)); //唤醒线程
     95     
     96     return 0; 
     97 } 
     98 
     99 
    100 
    101 /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 
    102 把任务运行完后再退出*/ 
    103 int pool_destroy () 
    104 { 
    105     if (pool->shutdown) 
    106         return -1;/*防止两次调用*/ 
    107     pool->shutdown = 1; 
    108 
    109     /*唤醒所有等待线程,线程池要销毁了*/ 
    110     pthread_cond_broadcast (&(pool->queue_ready)); 
    111 
    112     /*阻塞等待线程退出,否则就成僵尸了*/ 
    113     int i; 
    114     for (i = 0; i < pool->max_thread_num; i++) 
    115         pthread_join (pool->threadid[i], NULL); 
    116     free (pool->threadid); 
    117 
    118     /*销毁等待队列*/ 
    119     Cthread_task *head = NULL; 
    120     while (pool->queue_head != NULL) 
    121     { 
    122         head = pool->queue_head; 
    123         pool->queue_head = pool->queue_head->next; 
    124         free (head); 
    125     } 
    126     /*条件变量和互斥量也别忘了销毁*/ 
    127     pthread_mutex_destroy(&(pool->queue_lock)); 
    128     pthread_cond_destroy(&(pool->queue_ready)); 
    129      
    130     free (pool); 
    131     /*销毁后指针置空是个好习惯*/ 
    132     pool=NULL; 
    133     return 0; 
    134 } 
    135 
    136 
    137 
    138 void * thread_routine (void *arg) 
    139 { 
    140     printf ("starting thread 0x%x
    ", pthread_self ()); 
    141     while (1) 
    142     { 
    143         pthread_mutex_lock (&(pool->queue_lock)); 
    144 
    145         while (pool->cur_task_size == 0 && !pool->shutdown) //当前没有任务,线程池处于非关闭
    146         { 
    147             printf ("thread 0x%x is waiting
    ", pthread_self ()); 
    148             pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));  //线程等待
    149         } 
    150 
    151         /*线程池要销毁了*/ 
    152         if (pool->shutdown) 
    153         { 
    154             /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/ 
    155             pthread_mutex_unlock (&(pool->queue_lock)); 
    156             printf ("thread 0x%x will exit
    ", pthread_self ()); 
    157             pthread_exit (NULL); 
    158         } 
    159 
    160         printf ("thread 0x%x is starting to work
    ", pthread_self ()); 
    161 
    162          
    163         /*待处理任务减1,并取出链表中的头元素*/ 
    164         pool->cur_task_size--;   //等待任务-1
    165         Cthread_task *task = pool->queue_head; //取第一个任务处理
    166         pool->queue_head = task->next; //链表头指向下一个任务
    167         pthread_mutex_unlock (&(pool->queue_lock)); 
    168 
    169         /*调用回调函数,执行任务*/ 
    170         (*(task->process)) (task->arg); 
    171         free (task); 
    172         task = NULL; 
    173     } 
    174     /*这一句应该是不可达的*/ 
    175     pthread_exit (NULL); 
    176 }
    177 
    178 void * myprocess (void *arg) 
    179 { 
    180     printf ("threadid is 0x%x, working on task %d
    ", pthread_self (),*(int *) arg); 
    181     sleep (1);/*休息一秒,延长任务的执行时间*/ 
    182     return NULL; 
    183 } 
    184 
    185 int main (int argc, char **argv) 
    186 { 
    187     pool_init (3);/*创建线程池,线程池中最多三个活动线程*/ 
    188      
    189     /*连续向池中投入10个任务*/ 
    190     int *workingnum = (int *) malloc (sizeof (int) * 10); 
    191     int i; 
    192     for (i = 0; i < 10; i++) 
    193     { 
    194         workingnum[i] = i; 
    195         pool_add_task (myprocess, &workingnum[i]); 
    196     } 
    197     /*等待所有任务完成*/ 
    198     sleep (5); 
    199     /*销毁线程池*/ 
    200     pool_destroy (); 
    201 
    202     free (workingnum); 
    203     
    204     return 0; 
    205 }

     

  • 相关阅读:
    函数的缺省参数和函数初始化示例以及布尔型参数的使用示例
    指针使用示例程序
    按值传递对象和按址传递对象
    详解js跨域
    CSS之BFC及其应用
    js图片预加载、有序加载
    12个非常有用的JavaScript技巧
    MySQL使用pt-online-change-schema工具在线修改1.6亿级数据表结构
    nodeJS实现一个在线填表应用
    浏览器的缓存机制
  • 原文地址:https://www.cnblogs.com/hjh-666/p/11110570.html
Copyright © 2011-2022 走看看