zoukankan      html  css  js  c++  java
  • 一份C++线程池的代码,非常实用

    1. #ifndef _ThreadPool_H_
    2. #define _ThreadPool_H_
    3. #pragma warning(disable: 4530)
    4. #pragma warning(disable: 4786)
    5. #include <cassert>
    6. #include <vector>
    7. #include <queue>
    8. #include <windows.h>
    9. using namespace std;
    10. class ThreadJob  //工作基类
    11. {
    12. public:
    13.   //供线程池调用的虚函数
    14.   virtual void DoJob(void *pPara) = 0;
    15. };
    16. class ThreadPool
    17. {
    18. public:
    19.   //dwNum 线程池规模
    20.   ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)
    21.   {
    22.     InitializeCriticalSection(&_csThreadVector);
    23.     InitializeCriticalSection(&_csWorkQueue);
    24.     _EventComplete = CreateEvent(0, false, false, NULL);
    25.     _EventEnd = CreateEvent(0, true, false, NULL);
    26.     _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    27.     _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
    28.     assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
    29.     assert(_EventComplete != INVALID_HANDLE_VALUE);
    30.     assert(_EventEnd != INVALID_HANDLE_VALUE);
    31.     assert(_SemaphoreDel != INVALID_HANDLE_VALUE);
    32.     AdjustSize(dwNum <= 0 ? 4 : dwNum);
    33.   }
    34.   ~ThreadPool()
    35.   {
    36.     DeleteCriticalSection(&_csWorkQueue);
    37.     CloseHandle(_EventEnd);
    38.     CloseHandle(_EventComplete);
    39.     CloseHandle(_SemaphoreCall);
    40.     CloseHandle(_SemaphoreDel);
    41.     vector<ThreadItem*>::iterator iter;
    42.     for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
    43.     {
    44.       if(*iter)
    45.         delete *iter;
    46.     }
    47.     DeleteCriticalSection(&_csThreadVector);
    48.   }
    49.   //调整线程池规模
    50.   int AdjustSize(int iNum)
    51.   {
    52.     if(iNum > 0)
    53.     {
    54.       ThreadItem *pNew;
    55.       EnterCriticalSection(&_csThreadVector);
    56.       for(int _i=0; _i<iNum; _i++)
    57.       {
    58.         _ThreadVector.push_back(pNew = new ThreadItem(this));
    59.         assert(pNew);
    60.         pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
    61.         // set priority
    62.         SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
    63.         assert(pNew->_Handle);
    64.       }
    65.       LeaveCriticalSection(&_csThreadVector);
    66.     }
    67.     else
    68.     {
    69.       iNum *= -1;
    70.       ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
    71.     }
    72.     return (int)_lThreadNum;
    73.   }
    74.   //调用线程池
    75.   void Call(void (*pFunc)(void  *), void *pPara = NULL)
    76.   {
    77.     assert(pFunc);
    78.     EnterCriticalSection(&_csWorkQueue);
    79.     _JobQueue.push(new JobItem(pFunc, pPara));
    80.     LeaveCriticalSection(&_csWorkQueue);
    81.     ReleaseSemaphore(_SemaphoreCall, 1, NULL);
    82.   }
    83.   //调用线程池
    84.   inline void Call(ThreadJob * p, void *pPara = NULL)
    85.   {
    86.     Call(CallProc, new CallProcPara(p, pPara));
    87.   }
    88.   //结束线程池, 并同步等待
    89.   bool EndAndWait(DWORD dwWaitTime = INFINITE)
    90.   {
    91.     SetEvent(_EventEnd);
    92.     return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
    93.   }
    94.   //结束线程池
    95.   inline void End()
    96.   {
    97.     SetEvent(_EventEnd);
    98.   }
    99.   inline DWORD Size()
    100.   {
    101.     return (DWORD)_lThreadNum;
    102.   }
    103.   inline DWORD GetRunningSize()
    104.   {
    105.     return (DWORD)_lRunningNum;
    106.   }
    107.   bool IsRunning()
    108.   {
    109.     return _lRunningNum > 0;
    110.   }
    111. protected:
    112.   //工作线程
    113.   static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
    114.   {
    115.     ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
    116.     assert(pThread);
    117.     ThreadPool *pThreadPoolObj = pThread->_pThis;
    118.     assert(pThreadPoolObj);
    119.     InterlockedIncrement(&pThreadPoolObj->_lThreadNum);
    120.     HANDLE hWaitHandle[3];
    121.     hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
    122.     hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
    123.     hWaitHandle[2] = pThreadPoolObj->_EventEnd;
    124.     JobItem *pJob;
    125.     bool fHasJob;
    126.     for(;;)
    127.     {
    128.       DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);
    129.       //响应删除线程信号
    130.       if(wr == WAIT_OBJECT_0 + 1) 
    131.         break;
    132.       //从队列里取得用户作业
    133.       EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
    134.       if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
    135.       {
    136.         pJob = pThreadPoolObj->_JobQueue.front();
    137.         pThreadPoolObj->_JobQueue.pop();
    138.         assert(pJob);
    139.       }
    140.       LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
    141.       //受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
    142.       if(wr == WAIT_OBJECT_0 + 2 && !fHasJob) 
    143.         break;
    144.       if(fHasJob && pJob)
    145.       {
    146.         InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    147.         pThread->_dwLastBeginTime = GetTickCount();
    148.         pThread->_dwCount++;
    149.         pThread->_fIsRunning = true;
    150.         pJob->_pFunc(pJob->_pPara); //运行用户作业
    151.         delete pJob;
    152.         pThread->_fIsRunning = false;
    153.         InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
    154.       }
    155.     }
    156.     //删除自身结构
    157.     EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
    158.     pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
    159.     LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
    160.     delete pThread;
    161.     InterlockedDecrement(&pThreadPoolObj->_lThreadNum);
    162.     if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
    163.       SetEvent(pThreadPoolObj->_EventComplete);
    164.     return 0;
    165.   }
    166.   //调用用户对象虚函数
    167.   static void CallProc(void *pPara)
    168.   {
    169.     CallProcPara *cp = static_cast<CallProcPara *>(pPara);
    170.     assert(cp);
    171.     if(cp)
    172.     {
    173.       cp->_pObj->DoJob(cp->_pPara);
    174.       delete cp;
    175.     }
    176.   }
    177.   //用户对象结构
    178.   struct CallProcPara 
    179.   {
    180.     ThreadJob* _pObj;//用户对象
    181.     void *_pPara;//用户参数
    182.     CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
    183.   };
    184.   //用户函数结构
    185.   struct JobItem
    186.   {
    187.     void (*_pFunc)(void  *);//函数
    188.     void *_pPara; //参数
    189.     JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
    190.   };
    191.   //线程池中的线程结构
    192.   struct ThreadItem
    193.   {
    194.     HANDLE _Handle; //线程句柄
    195.     ThreadPool *_pThis;  //线程池的指针
    196.     DWORD _dwLastBeginTime; //最后一次运行开始时间
    197.     DWORD _dwCount; //运行次数
    198.     bool _fIsRunning;
    199.     ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
    200.     ~ThreadItem()
    201.     {
    202.       if(_Handle)
    203.       {
    204.         CloseHandle(_Handle);
    205.         _Handle = NULL;
    206.       }
    207.     }
    208.   };
    209.   std::queue<JobItem *> _JobQueue;  //工作队列
    210.   std::vector<ThreadItem *>  _ThreadVector; //线程数据
    211.   CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界
    212.   HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
    213.   long _lThreadNum, _lRunningNum; //线程数, 运行的线程数
    214. };
    215. #endif //_ThreadPool_H_

    转载自 http://blog.csdn.net/pjchen/archive/2004/11/06/170606.aspx

    基本上是拿来就用了,对WIN32 API不熟,但对线程池的逻辑还是比较熟的,认为这个线程池写得很清晰,我拿来用在一个多线程下载的模块中。很实用的东东。
    调用方法
    void threadfunc(void *p)
    {
         YourClass* yourObject = (YourClass*)    p;

    //... } ThreadPool tp; for(i=0; i<100; i++)   tp.Call(threadfunc);

    ThreadPool tp(20);//20为初始线程池规模

    tp.Call(threadfunc, lpPara);     

    使用时注意几点:
    1. ThreadJob  没什么用,直接写线程函数吧。
    2. 线程函数(threadfunc)的入口参数void* 可以转成自定义的类型对象,这个对象可以记录下线程运行中的数据,并设置线程当前状态,以此与线程进行交互。
    3. 线程池有一个EndAndWait函数,用于让线程池中所有计算正常结束。有时线程池中的一个线程可能要运行很长时间,怎么办?可以通过线程函数threadfunc的入口参数对象来处理,比如:
    class YourClass {
      int cmd; // cmd = 1是上线程停止计算,正常退出。
    };
    threadfunc(void* p) {
      YourClass* yourObject = (YourClass*)p;
      while (true) {
        // do some calculation
        if (yourClass->cmd == 1)
          break;
      }
    }
    在主线程中设置yourClass->cmd = 1,该线程就会自然结束。
    很简洁通用的线程池实现。
     
  • 相关阅读:
    字符串类型
    mysql-schema-sync 实现 不同环境实例间表结构统一
    order by 运行过程
    MySQL 生成随机测试数据
    MySQL binlog 日志处理
    MySQL 查询优化
    使用 pyenv 管理不同的 Python 版本
    使用 pyenv 管理不同的 Python 版本
    MVC5 已有打开的与此 Command 相关联的 DataReader,必须首先将它关闭。
    在ASP.net MVC中利用ajax配合razor进行局部加载(给页面套好样式以后,一刷新就不合适了,终于找到了解决方案)
  • 原文地址:https://www.cnblogs.com/thisway/p/5088818.html
Copyright © 2011-2022 走看看