zoukankan      html  css  js  c++  java
  • C++之线程池

    为什么需要线程池
    目前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 

    传 统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即 时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于 不停的创建线程,销毁线程的状态。

    我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3。 

    T1:线程创建时间
    T2:线程执行时间,包括线程的同步等时间
    T3:线程销毁时间 
    那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。
     
    除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。
     
    因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。
     
    基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。
      1 #ifndef _ThreadPool_H_
      2 #define _ThreadPool_H_
      3 
      4 #pragma warning(disable: 4530)
      5 #pragma warning(disable: 4786)
      6 
      7 #include <cassert>
      8 #include <vector>
      9 #include <queue>
     10 #include <windows.h>
     11 
     12 
     13 class ThreadJob  //工作基类
     14 {
     15 public:
     16  //供线程池调用的虚函数
     17  virtual void DoJob(void *pPara) = 0;
     18 };
     19 
     20 class ThreadPool
     21 {
     22 
     23 public:
     24  //dwNum 线程池规模
     25  ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
     26  {
     27   InitializeCriticalSection(&_csThreadVector);
     28   InitializeCriticalSection(&_csWorkQueue);
     29 
     30   _EventComplete = CreateEvent(0, false, false, NULL);
     31   _EventEnd = CreateEvent(0, true, false, NULL);
     32   _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
     33   _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
     34 
     35   assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
     36   assert(_EventComplete != INVALID_HANDLE_VALUE);
     37   assert(_EventEnd != INVALID_HANDLE_VALUE);
     38   assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
     39 
     40   AdjustSize(dwNum <= 0 ? 4 : dwNum);
     41  }
     42 
     43  ~ThreadPool()
     44  {
     45   DeleteCriticalSection(&_csWorkQueue);
     46 
     47   CloseHandle(_EventEnd);
     48   CloseHandle(_EventComplete);
     49   CloseHandle(_SemaphoreCall);
     50   CloseHandle(_SemaphoreDel);
     51   
     52   vector<ThreadItem*>::iterator iter;
     53   for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
     54   {
     55    if(*iter)
     56     delete *iter;
     57   }
     58 
     59   DeleteCriticalSection(&_csThreadVector);
     60  }
     61  //调整线程池规模
     62  int AdjustSize(int iNum)
     63  {
     64   if(iNum > 0)
     65   {
     66    ThreadItem *pNew;
     67    EnterCriticalSection(&_csThreadVector);
     68    for(int _i=0; _i<iNum; _i++)
     69    {
     70     _ThreadVector.push_back(pNew = new ThreadItem(this)); 
     71     assert(pNew);
     72     pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
     73     assert(pNew->_Handle);
     74    }
     75    LeaveCriticalSection(&_csThreadVector);
     76   }
     77   else
     78   {
     79    iNum *= -1;
     80    ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
     81   }
     82   return (int)_lThreadNum;
     83  }
     84  //调用线程池
     85  void Call(void (*pFunc)(void  *), void *pPara = NULL)
     86  {
     87   assert(pFunc);
     88 
     89   EnterCriticalSection(&_csWorkQueue);
     90   _JobQueue.push(new JobItem(pFunc, pPara));
     91   LeaveCriticalSection(&_csWorkQueue);
     92 
     93   ReleaseSemaphore(_SemaphoreCall, 1, NULL);
     94  }
     95  //调用线程池
     96  inline void Call(ThreadJob * p, void *pPara = NULL)
     97  {
     98   Call(CallProc, new CallProcPara(p, pPara));
     99  }
    100  //结束线程池, 并同步等待
    101  bool EndAndWait(DWORD dwWaitTime = INFINITE)
    102  {
    103   SetEvent(_EventEnd);
    104   return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
    105  }
    106  //结束线程池
    107  inline void End()
    108  {
    109   SetEvent(_EventEnd);
    110  }
    111  inline DWORD Size()
    112  {
    113   return (DWORD)_lThreadNum;
    114  }
    115  inline DWORD GetRunningSize()
    116  {
    117   return (DWORD)_lRunningNum;
    118  }
    119  bool IsRunning()
    120  {
    121   return _lRunningNum > 0;
    122  }
    123 
    124 protected:
    125 
    126  //工作线程
    127  static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
    128  {
    129   ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
    130   assert(pThread);
    131 
    132   ThreadPool *pThreadPoolObj = pThread->_pThis;
    133   assert(pThreadPoolObj);
    134 
    135   InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
    136 
    137   HANDLE hWaitHandle[3];
    138   hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
    139   hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
    140   hWaitHandle[2] = pThreadPoolObj->_EventEnd;
    141 
    142   JobItem *pJob;
    143   bool fHasJob;
    144   
    145   for(;;)
    146   {
    147    DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
    148 
    149    //响应删除线程信号
    150    if(wr == WAIT_OBJECT_0 + 1)  
    151     break;
    152    
    153    //从队列里取得用户作业
    154    EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
    155    if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
    156    {
    157     pJob = pThreadPoolObj->_JobQueue.front();
    158     pThreadPoolObj->_JobQueue.pop();
    159     assert(pJob);
    160    }
    161    LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
    162 
    163    //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
    164    if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  
    165     break;
    166 
    167    if(fHasJob && pJob)
    168    {
    169     InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    170     pThread->_dwLastBeginTime = GetTickCount();
    171     pThread->_dwCount++;
    172     pThread->_fIsRunning = true;
    173     pJob->_pFunc(pJob->_pPara); //运行用户作业
    174     delete pJob; 
    175     pThread->_fIsRunning = false;
    176     InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
    177    }
    178   }
    179 
    180   //删除自身结构
    181   EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
    182   pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
    183   LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
    184 
    185   delete pThread;
    186 
    187   InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
    188 
    189   if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
    190    SetEvent(pThreadPoolObj->_EventComplete);
    191 
    192   return 0;
    193  }
    194  //调用用户对象虚函数
    195  static void CallProc(void *pPara) 
    196  {
    197   CallProcPara *cp = static_cast<CallProcPara *>(pPara);
    198   assert(cp);
    199   if(cp)
    200   {
    201    cp->_pObj->DoJob(cp->_pPara);
    202    delete cp;
    203   }
    204  }
    205  //用户对象结构
    206  struct CallProcPara  
    207  {
    208   ThreadJob* _pObj;//用户对象 
    209   void *_pPara;//用户参数
    210   CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
    211  };
    212  //用户函数结构
    213  struct JobItem 
    214  {
    215   void (*_pFunc)(void  *);//函数
    216   void *_pPara; //参数
    217   JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
    218  };
    219  //线程池中的线程结构
    220  struct ThreadItem
    221  {
    222   HANDLE _Handle; //线程句柄
    223   ThreadPool *_pThis;  //线程池的指针
    224   DWORD _dwLastBeginTime; //最后一次运行开始时间
    225   DWORD _dwCount; //运行次数
    226   bool _fIsRunning;
    227   ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
    228   ~ThreadItem()
    229   {
    230    if(_Handle)
    231    {
    232     CloseHandle(_Handle);
    233     _Handle = NULL;
    234    }
    235   }
    236  };
    237  
    238  std::queue<JobItem *> _JobQueue;  //工作队列
    239  std::vector<ThreadItem *>  _ThreadVector; //线程数据
    240 
    241  CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
    242 
    243  HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
    244  long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
    245 
    246 };
    247 
    248 #endif //_ThreadPool_H_
    View Code
     1 //Use:1>:
     2 void threadfunc(void *p)
     3 {
     4  //...
     5 }
     6  ThreadPool tp;
     7  for(i=0; i<100; i++)
     8   tp.Call(threadfunc);
     9 
    10  ThreadPool tp(20);//20为初始线程池规模
    11  tp.Call(threadfunc, lpPara);
    12  tp.AdjustSize(50);//增加50
    13  tp.AdjustSize(-30);//减少30
    14 
    15 
    16 //2>:
    17 class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展
    18 {
    19 public:
    20  virtual void DoJob(void *p)//自定义的虚函数
    21  {
    22   //....
    23  }
    24 };
    25  MyThreadJob mt[10];
    26  ThreadPool tp;
    27  for(i=0; i<100 i++)
    28   tp.Call(mt + i);//tp.Call(mt + i, para);
    View Code
  • 相关阅读:
    webpack高级概念code splitting 和 splitChunks (系列五)
    webpack高级概念Develoment 和 Production 不同环境的配置 (系列四)
    webpack高级概念Tree Shaking (树摇)(系列三)
    HarmonyOS三方件开发指南(16)-VideoCache 视频缓存
    鸿蒙开源第三方组件——uCrop_ohos图片裁剪组件
    Hi3516如何连接Wifi(三)
    【鸿蒙学院】鸿蒙IDE迎来重大更新,新特性足以让你尖叫
    《鸿蒙系统物联网模组——Neptune 三天全攻略》课件、代码
    预览器和编辑器双重发力,DevEco Studio 2.1 Beta 3强势来袭
    强大的鸿蒙开发环境 —— DevEco Studio 2.1 Beta3发布
  • 原文地址:https://www.cnblogs.com/jeromesunny/p/3222031.html
Copyright © 2011-2022 走看看