这段代码我试着抄下来理解了下,在测试过程当中出现不稳定的情况。
主要问题是最后的sleep,它是等待任务结束然后结束,而不是收割结束。
还要再改改。
#include<stdlib.h> #include<stdio.h> #include<pthread.h> /* * 存储线程任务的结构 * */ typedef struct worker { void *(*process) (void *arg); void *arg; struct worker *next; } CThread_worker; /* * 线程池的结构 * */ typedef struct { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; CThread_worker *queue_head; int shutdown; pthread_t *threadid; int max_thread_num; int cur_queue_size; } CThread_pool; int pool_add_worker(void *(*process) (void *arg),void *arg); void *thread_routine(void *arg); static CThread_pool *pool=NULL; void pool_init(int max_thread_num){ pool=(CThread_pool *) malloc (sizeof (CThread_pool)); pthread_mutex_init (&(pool->queue_lock),NULL); pthread_cond_init (&(pool->queue_ready),NULL); pool->queue_head=NULL; pool->max_thread_num=max_thread_num; pool->cur_queue_size=0; pool->shutdown=0; pool->threadid=(pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); int i=0; for (i=0;i<max_thread_num;i++) { pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL); } } int pool_add_worker(void *(*process) (void *arg),void *arg) { CThread_worker *newworker=(CThread_worker *)malloc(sizeof (CThread_worker)); newworker->process=process; newworker->arg=arg; newworker->next=NULL; pthread_mutex_lock (&(pool->queue_lock)); CThread_worker *member=pool->queue_head; if(member !=NULL) { while(member->next !=NULL) member=member->next; member->next=newworker; }else{ pool->queue_head=newworker; } pool->cur_queue_size++; pthread_mutex_unlock (&(pool->queue_lock)); pthread_cond_signal (&(pool->queue_ready)); return 0; } int pool_destroy() { if(pool->shutdown) return -1; pool->shutdown=1; pthread_cond_broadcast (&(pool->queue_ready)); int i; for(i=0;i<pool->max_thread_num;i++) pthread_join (pool->threadid[i],NULL); free(pool->threadid); CThread_worker *head=NULL; while(pool->queue_head !=NULL) { head=pool->queue_head; pool->queue_head=pool->queue_head->next; free(head); } pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free(pool); pool=NULL; return 0; } void * thread_routine(void *arg) { while(1) { pthread_mutex_lock (&(pool->queue_lock)); while(pool->cur_queue_size==0 && !pool->shutdown) { pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock)); } if(pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); pthread_exit(NULL); } pool->cur_queue_size--; CThread_worker *worker=pool->queue_head; pool->queue_head=worker->next; pthread_mutex_unlock(&(pool->queue_lock)); (*(worker->process)) (worker->arg); free(worker); worker=NULL; } pthread_exit(NULL); } void * myprocess(void *arg) { printf("hello! %d ",*(int *) arg); sleep(1); return NULL; } int main(int argc,char **argv) { pool_init(10); int *workingnum=(int *) malloc (sizeof (int) * 200); int i; for (i=0;i<100;i++) { workingnum[i]=i; pool_add_worker(myprocess,&workingnum[i]); } sleep(20); pool_destroy(); free(workingnum); return 0; }