zoukankan      html  css  js  c++  java
  • 线程池实例讲解(原创)

      最近老看一些服务器网关的代码页看了一些开源的代码。一个重要的技术线程池。

      何为线程池,所谓线程池就是一组用来处理,客户请求的线程组 这里的客户指代的是线程池服务的对象。

      

      线程池的实现原理:

        (1)消息队列调用函数,当有消息到来时候,将消息封装插入消息队列。

        (2)有一个 线程池附服务线程,该线程负责检索消息队列,创建线程池线程,将该消息派发到线程池的某一线程处理。

        (3)线程池线程参数,该参数应该是一个结构体,该结构体中某项指向一个消息,处理完成之后删除消息,进入等待状态。

      

      源代码如下 代码注释有说明 故不在单独讲解代码:

      1 #include <stdlib.h>
      2 #include <stdio.h>
      3 #include <unistd.h>
      4 #include <assert.h>
      5 #include <fcntl.h>
      6 #include <string.h>
      7 #include <time.h>
      8 #include <pthread.h>
      9 #include <sys/types.h>
     10 
     11 
     12 //消息队列基本元素
     13 typedef  struct _STPOOLMsgElement       //消息队列基本元素
     14 {
     15    char * param;                       //代表消息队列消息  实际情况中也可以使用结构体
     16    struct _STPOOLMsgData *next;         //下一条消息
     17    
     18 }STPOOLMsgData,*PSTPOOLMsgData;
     19 
     20 
     21 //线程池基本元素
     22 typedef struct _STPOOLThreadElement
     23 {
     24     pthread_t thread;                      //但前线程ID
     25     pthread_mutex_t busy;                  // 用来同步线程池线程 和线程池服务线程
     26     pthread_cond_t nodata;                 //
     27     PSTPOOLMsgData pMsgData;               //当前线程需要处理的事件,如果为空线程池空闲
     28     struct _STPOOLThreadElement *next;     //线程池的下一个线程
     29 
     30 }STPOOLThreadElement,*PSTPOOLThreadElement;
     31 
     32 //变量定义
     33 //消息队列肯定有头有尾头出尾巴进  多个线程要访问必须有同步 以及初始化
     34 PSTPOOLMsgData     pStMsgQuequeBegin = NULL;
     35 PSTPOOLMsgData     pStMsgQuequeEnd = NULL;
     36 pthread_mutex_t queueAccess ;//= PTHREAD_MUTEX_INITIALIZER;
     37 pthread_cond_t     queueEmpty; //= PTHREAD_COND_INITIALIZER;
     38 
     39 //线程池链表  线程池的线程不管先后顺序,每次任务来了检索空闲线程,派遣任务
     40 PSTPOOLThreadElement  pStPool = NULL;
     41 
     42 //先来看消息队列相关函数  消息插入函数 负责消息封装插入消息队列
     43 void InsertMsg(char * pMsg)
     44 {
     45     //将参数封装成消息队列元素
     46     PSTPOOLMsgData pMsgData = (PSTPOOLMsgData)malloc(sizeof(STPOOLMsgData));
     47     if(!pMsgData)  return;
     48     
     49     char *pMsgDupli = malloc(strlen(pMsg) + 1);   //消息数据
     50     if(!pMsgDupli)
     51     {
     52         free(pMsgData);
     53         return;
     54     }
     55     strcpy(pMsgDupli,pMsg);
     56     pMsgData->param = pMsgDupli;
     57     pMsgData->next = NULL;
     58 
     59     //插入消息
     60     pthread_mutex_lock(&queueAccess);
     61     if (pStMsgQuequeEnd)
     62     {
     63         pStMsgQuequeEnd->next = pMsgData;
     64     }
     65     pStMsgQuequeEnd = pMsgData;
     66 
     67     if (!pStMsgQuequeBegin)
     68     {
     69         pStMsgQuequeBegin = pMsgData;
     70     }
     71 
     72     pthread_cond_broadcast(&queueEmpty);    //通知消息分发线程有消息到来
     73     pthread_mutex_unlock(&queueAccess);
     74 }
     75 
     76 
     77 //再来看看消息取出函数  采用阻塞模式,因为没有消息的时候所有线程都处于空闲状态
     78 PSTPOOLMsgData GetNextMsg()
     79 {
     80     PSTPOOLMsgData Result = NULL;
     81 
     82     pthread_mutex_lock(&queueAccess);
     83     //如果没有事件,一直等待直道有事件产生    取出一个事件
     84     while (pStMsgQuequeBegin == NULL)
     85         pthread_cond_wait(&queueEmpty, &queueAccess);
     86 
     87     Result = pStMsgQuequeBegin;
     88     pStMsgQuequeBegin = Result->next;
     89     if (!pStMsgQuequeBegin)     //没有消息
     90     {
     91         pStMsgQuequeEnd = NULL;
     92     }
     93     Result->next = NULL;
     94     pthread_mutex_unlock(&queueAccess);
     95     return Result;
     96 }
     97 
     98 
     99 //接下来看线程池线程
    100 void *PoolLDealMsgThread(PSTPOOLThreadElement listElement)
    101 {
    102     int threadId = 0;
    103     pthread_mutex_lock(&listElement->busy);
    104     while (1)
    105     {
    106         while (listElement->pMsgData == NULL)    //无事件处理 一直阻塞等待
    107             pthread_cond_wait(&listElement->nodata, &listElement->busy);
    108 
    109         //处理事件,清除事件占用空间
    110         
    111         threadId = listElement->thread;
    112         printf("threadID :%d  Msg: %s 
    ",threadId,listElement->pMsgData->param);
    113         
    114         free(listElement->pMsgData->param);
    115         free(listElement->pMsgData);
    116 
    117         listElement->pMsgData = NULL;
    118     }
    119     pthread_mutex_unlock(&listElement->busy);
    120     pthread_exit(NULL);
    121     return NULL;
    122 }
    123 
    124 
    125 //接着看看线程池服务线程 这里调用DispatchMsg 函数分发消息
    126 void DispatchMsg(PSTPOOLMsgData pMsgData)
    127 {
    128        int Result = 0;
    129        PSTPOOLThreadElement cur = NULL;
    130 
    131        //查找空闲线程处理消息
    132        for (cur = pStPool; cur != NULL; cur = cur->next)
    133        {
    134             //双重锁定   提高程序性能
    135             if (cur->pMsgData == NULL)
    136             {
    137                 pthread_mutex_lock(&cur->busy);
    138                 if (cur->pMsgData != NULL)
    139                 {
    140                     pthread_mutex_unlock(&cur->busy);
    141                     continue;
    142                 }
    143                 cur->pMsgData = pMsgData;
    144                 pthread_cond_broadcast(&cur->nodata);   //通知线程池线程有事件到来
    145                 pthread_mutex_unlock(&cur->busy);
    146                 return;
    147             }
    148       }
    149       //新建线程处理消息
    150       cur = (PSTPOOLThreadElement)malloc(sizeof(STPOOLThreadElement));
    151       if(!cur)
    152       {
    153             free(pMsgData->param);
    154             free(pMsgData);
    155             return;
    156       }
    157 
    158       memset(cur,0,sizeof(STPOOLThreadElement));
    159 
    160       pthread_mutex_init(&cur->busy, NULL);
    161       pthread_cond_init(&cur->nodata, NULL);
    162       cur->pMsgData = pMsgData;
    163       cur->next = NULL;
    164       Result = pthread_create(&cur->thread, NULL, (void *(*)(void *)) PoolLDealMsgThread, cur);
    165 
    166       if (0 != Result)
    167       {
    168         free(cur);
    169         free(pMsgData->param);
    170         free(pMsgData);
    171         return;
    172       }
    173       //新线程插入线程池
    174       cur->next = pStPool;
    175       pStPool = cur;
    176 }
    177 
    178 
    179 void *QueueDealFuction(void *data)
    180 {
    181     PSTPOOLMsgData pMsgData = NULL;
    182     while (1)
    183     {
    184         pMsgData = GetNextMsg();  //读取一条事件记录
    185         DispatchMsg(pMsgData);
    186     }
    187     return NULL;
    188 }
    189 
    190 //好了基本线程池模型已经构建完毕 现在看看主函数
    191 char testMsg[5][10]  = {"hello!",
    192                     "this",
    193                     "is",
    194                     "my",
    195                     "testApp",
    196                     } ;
    197 
    198 int main()
    199 {
    200     //初始化开启线程池服务线程
    201     int i = 0;
    202     pthread_mutex_init(&queueAccess, NULL);
    203     pthread_cond_init(&queueEmpty, NULL);
    204     
    205     pthread_t queueHandler;
    206     if(0 != pthread_create(&queueHandler, NULL, (void *(*)(void *)) QueueDealFuction, NULL) )
    207     {
    208         return -1;
    209     }
    210     //这里消息仅代表字符串 
    211     for(i=0 ;i < 5 ;i++)
    212     {
    213         InsertMsg(testMsg[i]);
    214     }
    215     sleep(10);
    216     return 0;
    217 }

    编译

    gcc -g  main.c -lpthread  -o main

    连续运行程序三次 三次结果如下:

     

    分析结果可以知道: 在线程池中并不是先分配处理线程的的消息先处理完成,这取决于操作系统线程调度。

    由此可以得知如要顺序处理消息,这种方式不使用于线程池。

    一些技巧:

      线程池常用于服务器网络编程,线程池参数中往往携带输出socket ,当我们处理完消息后,将结果通过socket 发送给客户端。

      如:web服务器,加载生成XML文件,通过socket 发送给客户端。

        

      

      

      

      

  • 相关阅读:
    BIP_开发案例08_BI Publisher图表示例 饼状图/直方图/折线图(案例)
    BIP_开发案例07_将原有Report Builer报表全部转为XML Publisher形式(案例)
    Form_Form Builder Export导出为Excel(案例)
    BIP_开发案例06_以RB.RDF为数据源BIP.RTF为模板的简单例子(案例)
    Form_Form Builder开发基于视图页面和自动代码生成包(案例)
    BIP_开发案例05_BI Pubisher标准做法以BIP.XML为数据源以BIP.RTF为模板的简单例子(案例)
    BIP_开发案例04_通过BI Publisher实现打印报表的二维码(案例)(待整理)
    BIP_开发案例03_将原有Report Builer报表全部转为XML Publisher形式(案例)
    BIP_开发案例02_BI Publisher中复杂案例实现代码(案例)
    BIP_开发案例01_BI Publisher报表手工提交和控制(案例)
  • 原文地址:https://www.cnblogs.com/wolfrickwang/p/3247385.html
Copyright © 2011-2022 走看看