线程池:
线程池是一种多线程处理形式,初始创建多个线程,初始线程处于wait状态。处理过程中将任务添加到队列中,按照队列顺序依次处理,此时线程处于work状态自动启动这些任务。线程任务处理完后继续处理队列中待执行任务,最后完成所有任务放回至线程池统一销毁。线程池线程都是后台线程,适用于连续产生大量并发任务的场合。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待监督管理者分配可并发执行的任务。线程池不仅避免了在处理短时间任务时创建与销毁线程的代价,还能够保证内核的充分利用,防止过分调度。
1、初始化线程池:创建初始的任务链表(队列)及N个线程,初始化条件变量;
2、任务链表管理:任务存放至链表中,记录当前待处理任务量,并存放未处理的任务,为线程池提供一种缓冲机制;
当任务链表头为空,则表示无待处理任务,可等待或销毁线程池;当任务链表头指向非空,线程执行待任务,任务链表头下移,等待任务-1。
3、线程池管理器(ThreadPoolManager):监测并管理线程池中线程状态,运用阻塞IO的方式
线程状态管理:1)wait -- 线程池处于非关闭,当前无任务,线程阻塞;
2)work -- 线程唤醒,执行待处理任务(process),任务链表头下移,等待任务-1;
4、线程池销毁:当任务全部完成或接收到销毁指令,销毁等待链表及所有线程,销毁后指针置空。
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <unistd.h> 4 #include <sys/types.h> 5 #include <pthread.h> 6 7 8 typedef struct task 9 { 10 void *(*process) (void *arg); 11 void *arg; 12 struct task *next; 13 } Cthread_task; 14 15 16 /*线程池结构*/ 17 typedef struct 18 { 19 pthread_mutex_t queue_lock; 20 pthread_cond_t queue_ready; 21 22 /*链表结构,线程池中所有等待任务7*/ 23 Cthread_task *queue_head; 24 25 /*是否销毁线程池*/ 26 int shutdown; 27 pthread_t *threadid; 28 29 /*线程池中线程数目3*/ 30 int max_thread_num; 31 32 /*当前等待的任务数*/ 33 int cur_task_size; 34 35 } Cthread_pool; 36 37 static Cthread_pool *pool = NULL; 38 39 void *thread_routine (void *arg); 40 41 void pool_init (int max_thread_num) 42 { 43 int i = 0; 44 45 pool = (Cthread_pool *) malloc (sizeof (Cthread_pool)); 46 47 pthread_mutex_init (&(pool->queue_lock), NULL); 48 /*初始化条件变量*/ 49 pthread_cond_init (&(pool->queue_ready), NULL); 50 51 pool->queue_head = NULL; //等待任务链表为空 52 53 pool->max_thread_num = max_thread_num; 54 pool->cur_task_size = 0; 55 56 pool->shutdown = 0; 57 58 pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); 59 60 for (i = 0; i < max_thread_num; i++) //3 61 { 62 pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL); //创建线程 63 } 64 } 65 66 67 68 /*向线程池中加入任务*/ 69 int pool_add_task (void *(*process) (void *arg), void *arg) 70 { 71 /*构造一个新任务 初始化*/ 72 Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task)); 73 task->process = process; 74 task->arg = arg; 75 task->next = NULL; 76 77 pthread_mutex_lock (&(pool->queue_lock)); 78 /*将任务加入到等待队列中 加到链表尾部或空成员中*/ 79 Cthread_task *member = pool->queue_head; 80 if (member != NULL) 81 { 82 while (member->next != NULL) 83 member = member->next; 84 member->next = task; 85 } 86 else 87 { 88 pool->queue_head = task; 89 } 90 91 pool->cur_task_size++; 92 pthread_mutex_unlock (&(pool->queue_lock)); 93 94 pthread_cond_signal (&(pool->queue_ready)); //唤醒线程 95 96 return 0; 97 } 98 99 100 101 /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直 102 把任务运行完后再退出*/ 103 int pool_destroy () 104 { 105 if (pool->shutdown) 106 return -1;/*防止两次调用*/ 107 pool->shutdown = 1; 108 109 /*唤醒所有等待线程,线程池要销毁了*/ 110 pthread_cond_broadcast (&(pool->queue_ready)); 111 112 /*阻塞等待线程退出,否则就成僵尸了*/ 113 int i; 114 for (i = 0; i < pool->max_thread_num; i++) 115 pthread_join (pool->threadid[i], NULL); 116 free (pool->threadid); 117 118 /*销毁等待队列*/ 119 Cthread_task *head = NULL; 120 while (pool->queue_head != NULL) 121 { 122 head = pool->queue_head; 123 pool->queue_head = pool->queue_head->next; 124 free (head); 125 } 126 /*条件变量和互斥量也别忘了销毁*/ 127 pthread_mutex_destroy(&(pool->queue_lock)); 128 pthread_cond_destroy(&(pool->queue_ready)); 129 130 free (pool); 131 /*销毁后指针置空是个好习惯*/ 132 pool=NULL; 133 return 0; 134 } 135 136 137 138 void * thread_routine (void *arg) 139 { 140 printf ("starting thread 0x%x ", pthread_self ()); 141 while (1) 142 { 143 pthread_mutex_lock (&(pool->queue_lock)); 144 145 while (pool->cur_task_size == 0 && !pool->shutdown) //当前没有任务,线程池处于非关闭 146 { 147 printf ("thread 0x%x is waiting ", pthread_self ()); 148 pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); //线程等待 149 } 150 151 /*线程池要销毁了*/ 152 if (pool->shutdown) 153 { 154 /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/ 155 pthread_mutex_unlock (&(pool->queue_lock)); 156 printf ("thread 0x%x will exit ", pthread_self ()); 157 pthread_exit (NULL); 158 } 159 160 printf ("thread 0x%x is starting to work ", pthread_self ()); 161 162 163 /*待处理任务减1,并取出链表中的头元素*/ 164 pool->cur_task_size--; //等待任务-1 165 Cthread_task *task = pool->queue_head; //取第一个任务处理 166 pool->queue_head = task->next; //链表头指向下一个任务 167 pthread_mutex_unlock (&(pool->queue_lock)); 168 169 /*调用回调函数,执行任务*/ 170 (*(task->process)) (task->arg); 171 free (task); 172 task = NULL; 173 } 174 /*这一句应该是不可达的*/ 175 pthread_exit (NULL); 176 } 177 178 void * myprocess (void *arg) 179 { 180 printf ("threadid is 0x%x, working on task %d ", pthread_self (),*(int *) arg); 181 sleep (1);/*休息一秒,延长任务的执行时间*/ 182 return NULL; 183 } 184 185 int main (int argc, char **argv) 186 { 187 pool_init (3);/*创建线程池,线程池中最多三个活动线程*/ 188 189 /*连续向池中投入10个任务*/ 190 int *workingnum = (int *) malloc (sizeof (int) * 10); 191 int i; 192 for (i = 0; i < 10; i++) 193 { 194 workingnum[i] = i; 195 pool_add_task (myprocess, &workingnum[i]); 196 } 197 /*等待所有任务完成*/ 198 sleep (5); 199 /*销毁线程池*/ 200 pool_destroy (); 201 202 free (workingnum); 203 204 return 0; 205 }