最近老看一些服务器网关的代码页看了一些开源的代码。一个重要的技术线程池。
何为线程池,所谓线程池就是一组用来处理,客户请求的线程组 这里的客户指代的是线程池服务的对象。
线程池的实现原理:
(1)消息队列调用函数,当有消息到来时候,将消息封装插入消息队列。
(2)有一个 线程池附服务线程,该线程负责检索消息队列,创建线程池线程,将该消息派发到线程池的某一线程处理。
(3)线程池线程参数,该参数应该是一个结构体,该结构体中某项指向一个消息,处理完成之后删除消息,进入等待状态。
源代码如下 代码注释有说明 故不在单独讲解代码:
1 #include <stdlib.h> 2 #include <stdio.h> 3 #include <unistd.h> 4 #include <assert.h> 5 #include <fcntl.h> 6 #include <string.h> 7 #include <time.h> 8 #include <pthread.h> 9 #include <sys/types.h> 10 11 12 //消息队列基本元素 13 typedef struct _STPOOLMsgElement //消息队列基本元素 14 { 15 char * param; //代表消息队列消息 实际情况中也可以使用结构体 16 struct _STPOOLMsgData *next; //下一条消息 17 18 }STPOOLMsgData,*PSTPOOLMsgData; 19 20 21 //线程池基本元素 22 typedef struct _STPOOLThreadElement 23 { 24 pthread_t thread; //但前线程ID 25 pthread_mutex_t busy; // 用来同步线程池线程 和线程池服务线程 26 pthread_cond_t nodata; // 27 PSTPOOLMsgData pMsgData; //当前线程需要处理的事件,如果为空线程池空闲 28 struct _STPOOLThreadElement *next; //线程池的下一个线程 29 30 }STPOOLThreadElement,*PSTPOOLThreadElement; 31 32 //变量定义 33 //消息队列肯定有头有尾头出尾巴进 多个线程要访问必须有同步 以及初始化 34 PSTPOOLMsgData pStMsgQuequeBegin = NULL; 35 PSTPOOLMsgData pStMsgQuequeEnd = NULL; 36 pthread_mutex_t queueAccess ;//= PTHREAD_MUTEX_INITIALIZER; 37 pthread_cond_t queueEmpty; //= PTHREAD_COND_INITIALIZER; 38 39 //线程池链表 线程池的线程不管先后顺序,每次任务来了检索空闲线程,派遣任务 40 PSTPOOLThreadElement pStPool = NULL; 41 42 //先来看消息队列相关函数 消息插入函数 负责消息封装插入消息队列 43 void InsertMsg(char * pMsg) 44 { 45 //将参数封装成消息队列元素 46 PSTPOOLMsgData pMsgData = (PSTPOOLMsgData)malloc(sizeof(STPOOLMsgData)); 47 if(!pMsgData) return; 48 49 char *pMsgDupli = malloc(strlen(pMsg) + 1); //消息数据 50 if(!pMsgDupli) 51 { 52 free(pMsgData); 53 return; 54 } 55 strcpy(pMsgDupli,pMsg); 56 pMsgData->param = pMsgDupli; 57 pMsgData->next = NULL; 58 59 //插入消息 60 pthread_mutex_lock(&queueAccess); 61 if (pStMsgQuequeEnd) 62 { 63 pStMsgQuequeEnd->next = pMsgData; 64 } 65 pStMsgQuequeEnd = pMsgData; 66 67 if (!pStMsgQuequeBegin) 68 { 69 pStMsgQuequeBegin = pMsgData; 70 } 71 72 pthread_cond_broadcast(&queueEmpty); //通知消息分发线程有消息到来 73 pthread_mutex_unlock(&queueAccess); 74 } 75 76 77 //再来看看消息取出函数 采用阻塞模式,因为没有消息的时候所有线程都处于空闲状态 78 PSTPOOLMsgData GetNextMsg() 79 { 80 PSTPOOLMsgData Result = NULL; 81 82 pthread_mutex_lock(&queueAccess); 83 //如果没有事件,一直等待直道有事件产生 取出一个事件 84 while (pStMsgQuequeBegin == NULL) 85 pthread_cond_wait(&queueEmpty, &queueAccess); 86 87 Result = pStMsgQuequeBegin; 88 pStMsgQuequeBegin = Result->next; 89 if (!pStMsgQuequeBegin) //没有消息 90 { 91 pStMsgQuequeEnd = NULL; 92 } 93 Result->next = NULL; 94 pthread_mutex_unlock(&queueAccess); 95 return Result; 96 } 97 98 99 //接下来看线程池线程 100 void *PoolLDealMsgThread(PSTPOOLThreadElement listElement) 101 { 102 int threadId = 0; 103 pthread_mutex_lock(&listElement->busy); 104 while (1) 105 { 106 while (listElement->pMsgData == NULL) //无事件处理 一直阻塞等待 107 pthread_cond_wait(&listElement->nodata, &listElement->busy); 108 109 //处理事件,清除事件占用空间 110 111 threadId = listElement->thread; 112 printf("threadID :%d Msg: %s ",threadId,listElement->pMsgData->param); 113 114 free(listElement->pMsgData->param); 115 free(listElement->pMsgData); 116 117 listElement->pMsgData = NULL; 118 } 119 pthread_mutex_unlock(&listElement->busy); 120 pthread_exit(NULL); 121 return NULL; 122 } 123 124 125 //接着看看线程池服务线程 这里调用DispatchMsg 函数分发消息 126 void DispatchMsg(PSTPOOLMsgData pMsgData) 127 { 128 int Result = 0; 129 PSTPOOLThreadElement cur = NULL; 130 131 //查找空闲线程处理消息 132 for (cur = pStPool; cur != NULL; cur = cur->next) 133 { 134 //双重锁定 提高程序性能 135 if (cur->pMsgData == NULL) 136 { 137 pthread_mutex_lock(&cur->busy); 138 if (cur->pMsgData != NULL) 139 { 140 pthread_mutex_unlock(&cur->busy); 141 continue; 142 } 143 cur->pMsgData = pMsgData; 144 pthread_cond_broadcast(&cur->nodata); //通知线程池线程有事件到来 145 pthread_mutex_unlock(&cur->busy); 146 return; 147 } 148 } 149 //新建线程处理消息 150 cur = (PSTPOOLThreadElement)malloc(sizeof(STPOOLThreadElement)); 151 if(!cur) 152 { 153 free(pMsgData->param); 154 free(pMsgData); 155 return; 156 } 157 158 memset(cur,0,sizeof(STPOOLThreadElement)); 159 160 pthread_mutex_init(&cur->busy, NULL); 161 pthread_cond_init(&cur->nodata, NULL); 162 cur->pMsgData = pMsgData; 163 cur->next = NULL; 164 Result = pthread_create(&cur->thread, NULL, (void *(*)(void *)) PoolLDealMsgThread, cur); 165 166 if (0 != Result) 167 { 168 free(cur); 169 free(pMsgData->param); 170 free(pMsgData); 171 return; 172 } 173 //新线程插入线程池 174 cur->next = pStPool; 175 pStPool = cur; 176 } 177 178 179 void *QueueDealFuction(void *data) 180 { 181 PSTPOOLMsgData pMsgData = NULL; 182 while (1) 183 { 184 pMsgData = GetNextMsg(); //读取一条事件记录 185 DispatchMsg(pMsgData); 186 } 187 return NULL; 188 } 189 190 //好了基本线程池模型已经构建完毕 现在看看主函数 191 char testMsg[5][10] = {"hello!", 192 "this", 193 "is", 194 "my", 195 "testApp", 196 } ; 197 198 int main() 199 { 200 //初始化开启线程池服务线程 201 int i = 0; 202 pthread_mutex_init(&queueAccess, NULL); 203 pthread_cond_init(&queueEmpty, NULL); 204 205 pthread_t queueHandler; 206 if(0 != pthread_create(&queueHandler, NULL, (void *(*)(void *)) QueueDealFuction, NULL) ) 207 { 208 return -1; 209 } 210 //这里消息仅代表字符串 211 for(i=0 ;i < 5 ;i++) 212 { 213 InsertMsg(testMsg[i]); 214 } 215 sleep(10); 216 return 0; 217 }
编译
gcc -g main.c -lpthread -o main
连续运行程序三次 三次结果如下:
分析结果可以知道: 在线程池中并不是先分配处理线程的的消息先处理完成,这取决于操作系统线程调度。
由此可以得知如要顺序处理消息,这种方式不使用于线程池。
一些技巧:
线程池常用于服务器网络编程,线程池参数中往往携带输出socket ,当我们处理完消息后,将结果通过socket 发送给客户端。
如:web服务器,加载生成XML文件,通过socket 发送给客户端。