最近在研究如何实现一个线程池。具体线程池的实现不是我想说的重点,而是关于线程的一些操作问题。
先说一下我的每个工作线程都有一个自己对应的锁,而且锁只在线程自身使用。(这方便下面的理解)
一、线程的异步取消的问题
我的线程池对外提供两个函数,create_threadpool()和destroy_threadpool()用来创建和销毁线程池,创建的过程是初始化工作线程→初始化管理线程→….工作,销毁的过程是销毁管理线程→销毁工作线程→……工作。
在这两个过程中遇到的第一个问题是如何取消线程。
目前我搜集到的有3种方法:
1.通过pthread_kill()函数发送SIGQUIT信号
2.通过pthread_cancel同步取消线程
3.通过pthread_cancel异步取消线程
方法一:
其实也不一定要发送SIGQUIT,发送什么信号都可以,但是在线程的运行函数中必须实现对SIGQUIT或其他信号的处理,使用signal(SIGQUIT,xxx处理函数)。
我使用的方法三,所以我重点想说一下方法二和方法三。
pthread_cancel()发送信号给线程,但是线程会产生两种不同的处理方法,一种是继续运行,直到取消点(取消点可以网上搜索),再销毁线程,另一种是马上销毁线程。而产生这两种不同结果的方法就是通过设置pthread_setcanceltype()函数。pthread_setcanceltype的原型pthread_setcanceltype(int type,int* oldtype),第一个type有两种取值,PTHREAD_CANCEL_DEFFERED和PTHREAD_CANCEL_ASYNCHRONOUS。设置为PTHREAD_CANCEL_DEFFERED(默认情况)则运行到取消点再结束,设置为PTHREAD_CANCEL_ASYNCHRONOUS则立即销毁线程。
方法二:
所以据上所述,方法二就是设置为PTHREAD_CANCEL_DEFFERED,则线程等到取消点再销毁。
方法三:
同样,设置PTHREAD_CANCEL_ASYNCHRONOUS则立即销毁线程
二、销毁线程中的锁
我采用的是方法三,而我第一次上锁的地方是
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond);
pthread_mutex_unlock(&lock);
所以如果正常的话,线程应该是运行到pthread_cond_wait这个地方,线程就阻塞了。但是这是正常情况,有时线程还未来得及启动运行到此处可能就被销毁了。
例如我的测试用例就是初始化工作线程,然后马上销毁工作线程。
我不知道大家是不是遇到这个问题,我遇到的问题是初始化线程后,马上销毁线程,并且销毁线程中可能锁定了锁之后,经常会销毁锁失败(具体是否销毁失败的情况是不定的)。所以我想到在销毁锁之前先使用pthread_mutex_unlock解锁锁,然后再销毁线程。我运行了一千次这个过程,发现有时会成功,有时还是会失败。后来发现是因为有的线程还没运行到加锁的那行代码,此时解锁之后会产生未知的行为,所以销毁锁失败了。
官方对pthread_mutex_unlock的说明
If the mutex type is RTHREAD_MUTEX_NORMAL,error detection is not provided.If a thread attempts to unlock a mutex that is has not locked or a mutex which is unlocked;undefined behavior results.
所以我决定采用pthread_mutex_trylock(),此函数的目的是,如果此锁已经被锁定则直接返回,如果未被锁定则锁定,如此我再解锁就不会是未知行为了。之所以我能这样做,是因为线程的锁只在线程中使用了,并且销毁线程池就会销毁所有的线程。
代码我进行了简化,只是为了展现一下我所说的问题。
1 threadpooltest.h 2 3 #ifndef THREADPOOLTEST_H_ 4 #define THREADPOOLTEST_H_ 5 6 #include <pthread.h> 7 8 #ifdef __cplusplus 9 extern "C"{ 10 #endif 11 12 13 typedef struct{ 14 pthread_t thread_id; 15 pthread_mutex_t thread_lock;//线程锁,为普通锁 16 pthread_cond_t thread_cond;//条件变量 17 }threadinfo; 18 19 typedef struct{ 20 int thread_num; 21 threadinfo* workthread;//工作线程,这里只初始化一下工作线程 22 }threadpool; 23 24 25 extern int create_threadpool(threadpool** thpool); 26 27 extern int destroy_threadpool(threadpool** thpool); 28 29 30 #ifdef __cplusplus 31 } 32 #endif 33 34 #endif 35 36 37 38 threadpooltest.c 39 40 #include "threadpooltest.h" 41 #include <stdio.h> 42 #include <stdlib.h> 43 44 45 static void* work_run(void* arg);//线程的运行函数 46 //根据线程的id找到线程池中的id 47 static int get_poolthreadid_by_threadid(pthread_t thread_id, threadpool* thpool); 48 49 int create_threadpool(threadpool** thpool) 50 { 51 //为句柄分配空间 52 *thpool = (threadpool*)malloc(sizeof(threadpool)); 53 if (!(*thpool)) 54 return -1; 55 56 //初始化线程空间 57 (*thpool)->thread_num = 3;//简单点设定为3个 58 (*thpool)->workthread = (threadinfo*)malloc(sizeof(threadinfo)*(*thpool)->thread_num); 59 if (!((*thpool)->workthread)) 60 return -1; 61 62 //创建线程 63 int i; 64 for (i = 0; i < (*thpool)->thread_num; i++) 65 { 66 pthread_mutex_init(&((*thpool)->workthread[i].thread_lock),NULL); 67 pthread_cond_init(&((*thpool)->workthread[i].thread_cond),NULL); 68 pthread_create(&((*thpool)->workthread[i].thread_id),NULL,work_run,*thpool); 69 } 70 71 return 0; 72 } 73 74 75 int destroy_threadpool(threadpool** thpool) 76 { 77 int i; 78 for (i = 0; i < (*thpool)->thread_num; i++) 79 { 80 pthread_t thid = (*thpool)->workthread[i].thread_id; 81 //发送取消信号 82 pthread_cancel((*thpool)->workthread[i].thread_id); 83 //等待线程结束 84 pthread_join(thid,NULL); 85 86 /*If the mutex type is RTHREAD_MUTEX_NORMAL,error detection is not provided.If a thread attempts to unlock a mutex that is has not locked or a mutex which is unlocked;undefined behavior results.*/ 87 //尝试解锁 88 pthread_mutex_trylock(&((*thpool)->workthread[i].thread_lock)); 89 pthread_mutex_unlock(&((*thpool)->workthread[i].thread_lock)); 90 //摧毁锁 91 if (0 != pthread_mutex_destroy(&((*thpool)->workthread[i].thread_lock))) 92 { 93 #ifdef MY_DEBUG 94 fprintf(stderr,"destroy %u thread mutex fail! file:%s line:%d\n",thid,__FILE__,__LINE__); 95 #endif 96 } 97 //摧毁信号量 98 if (0 != pthread_cond_destroy(&((*thpool)->workthread[i].thread_cond))) 99 { 100 #ifdef MY_DEBUG 101 fprintf(stderr,"destroy %u thread cond fail! file:%s line:%d\n",thid,__FILE__,__LINE__); 102 #endif 103 } 104 } 105 106 //释放线程空间 107 free((*thpool)->workthread); 108 (*thpool)->workthread = NULL; 109 //释放线程池空间 110 free(*thpool); 111 *thpool = NULL; 112 113 return 0; 114 } 115 116 117 void* work_run(void* arg) 118 { 119 pthread_t selfid = pthread_self(); 120 int oldstate; 121 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&oldstate);//设定线程的取消未异步类型 122 123 threadpool* thpool = (threadpool*)arg; 124 if (!thpool) 125 pthread_exit(NULL); 126 127 int workthread_id = get_poolthreadid_by_threadid(selfid,thpool); 128 if (workthread_id == -1) 129 pthread_exit(NULL); 130 131 do{ 132 fprintf(stderr,"%u before lock\n",selfid); 133 pthread_mutex_lock(&(thpool->workthread[workthread_id].thread_lock)); 134 fprintf(stderr,"%u is waiting\n",selfid); 135 //此处简化了我的线程池,没有 管理线程会发送线程,所以这里是一直阻塞的 136 pthread_cond_wait(&(thpool->workthread[workthread_id].thread_cond), &(thpool->workthread[workthread_id].thread_lock)); 137 pthread_mutex_unlock(&(thpool->workthread[workthread_id].thread_lock)); 138 }while(1); 139 } 140 141 142 int get_poolthreadid_by_threadid(pthread_t thread_id, threadpool* thpool) 143 { 144 if (!thpool) 145 return -1; 146 147 int i; 148 for (i = 0; i < thpool->thread_num; i++) 149 { 150 if (thpool->workthread[i].thread_id == thread_id) 151 { 152 return i; 153 } 154 } 155 return -1; 156 }
三、进一步深入的疑问
我发现如果我将线程设置为detachable的状态,再去除pthread_join就会造成锁和信号量的销毁失败,目前我还没想明白原因。可能是因为在销毁的时候,线程还没完全销毁的缘故吧。