zoukankan      html  css  js  c++  java
  • 一个Windows C++的线程池的实现

    此线程池所依赖的线程类,请参看《一个Windows C++的线程类实现》:

    http://blog.csdn.net/huyiyang2010/archive/2010/08/10/5801597.aspx

     

    ThreadPoolExecutor.h

    1. #ifndef __THREAD_POOL_EXECUTOR__  
    2. #define __THREAD_POOL_EXECUTOR__  
    3.   
    4. #include "Thread.h"  
    5. #include <set>  
    6. #include <list>  
    7. #include <windows.h>  
    8.   
    9. class CThreadPoolExecutor  
    10. {  
    11. public:  
    12.     CThreadPoolExecutor(void);  
    13.     ~CThreadPoolExecutor(void);  
    14.   
    15.     /** 
    16.       初始化线程池,创建minThreads个线程 
    17.     **/  
    18.     bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse);  
    19.   
    20.     /** 
    21.       执行任务,若当前任务列表没有满,将此任务插入到任务列表,返回true 
    22.       若当前任务列表满了,但当前线程数量小于最大线程数,将创建新线程执行此任务,返回true 
    23.       若当前任务列表满了,但当前线程数量等于最大线程数,将丢弃此任务,返回false 
    24.     **/  
    25.     bool Execute(Runnable * pRunnable);  
    26.   
    27.     /** 
    28.       终止线程池,先制止塞入任务, 
    29.       然后等待直到任务列表为空, 
    30.       然后设置最小线程数量为0, 
    31.       等待直到线程数量为空, 
    32.       清空垃圾堆中的任务 
    33.     **/  
    34.     void Terminate();  
    35.   
    36.     /** 
    37.       返回线程池中当前的线程数量 
    38.     **/  
    39.     unsigned int GetThreadPoolSize();  
    40.   
    41. private:  
    42.     /** 
    43.       获取任务列表中的任务,若任务列表为空,返回NULL 
    44.     **/  
    45.     Runnable * GetTask();  
    46.   
    47.     static unsigned int WINAPI StaticThreadFunc(void * arg);  
    48.   
    49. private:  
    50.     class CWorker : public CThread  
    51.     {  
    52.     public:  
    53.         CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL);  
    54.         ~CWorker();  
    55.         void Run();  
    56.   
    57.     private:  
    58.         CThreadPoolExecutor * m_pThreadPool;  
    59.         Runnable * m_pFirstTask;  
    60.         volatile bool m_bRun;  
    61.     };  
    62.   
    63.     typedef std::set<CWorker *> ThreadPool;  
    64.     typedef std::list<Runnable *> Tasks;  
    65.     typedef Tasks::iterator TasksItr;  
    66.     typedef ThreadPool::iterator ThreadPoolItr;  
    67.   
    68.     ThreadPool m_ThreadPool;  
    69.     ThreadPool m_TrashThread;  
    70.     Tasks m_Tasks;  
    71.   
    72.     CRITICAL_SECTION m_csTasksLock;  
    73.     CRITICAL_SECTION m_csThreadPoolLock;  
    74.   
    75.     volatile bool m_bRun;  
    76.     volatile bool m_bEnableInsertTask;  
    77.     volatile unsigned int m_minThreads;  
    78.     volatile unsigned int m_maxThreads;  
    79.     volatile unsigned int m_maxPendingTasks;  
    80. };  
    81.   
    82. #endif  

     

    ThreadPoolExecutor.cpp

    1. #include "ThreadPoolExecutor.h"  
    2.   
    3. CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) :   
    4. m_pThreadPool(pThreadPool),  
    5. m_pFirstTask(pFirstTask),  
    6. m_bRun(true)  
    7. {  
    8.       
    9. }  
    10.   
    11. CThreadPoolExecutor::CWorker::~CWorker()  
    12. {  
    13. }  
    14.   
    15. /** 
    16.   执行任务的工作线程。 
    17.   当前没有任务时, 
    18.   如果当前线程数量大于最小线程数量,减少线程, 
    19.   否则,执行清理程序,将线程类给释放掉 
    20. **/  
    21. void CThreadPoolExecutor::CWorker::Run()  
    22. {  
    23.     Runnable * pTask = NULL;  
    24.     while(m_bRun)  
    25.     {  
    26.         if(NULL == m_pFirstTask)  
    27.         {  
    28.             pTask = m_pThreadPool->GetTask();  
    29.         }  
    30.         else  
    31.         {  
    32.             pTask = m_pFirstTask;  
    33.             m_pFirstTask = NULL;  
    34.         }  
    35.   
    36.         if(NULL == pTask)  
    37.         {  
    38.             EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
    39.             if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads)  
    40.             {  
    41.                 ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);  
    42.                 if(itr != m_pThreadPool->m_ThreadPool.end())  
    43.                 {  
    44.                     m_pThreadPool->m_ThreadPool.erase(itr);  
    45.                     m_pThreadPool->m_TrashThread.insert(this);  
    46.                 }  
    47.                 m_bRun = false;  
    48.             }  
    49.             else  
    50.             {  
    51.                 ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();  
    52.                 while(itr != m_pThreadPool->m_TrashThread.end())  
    53.                 {  
    54.                     (*itr)->Join();  
    55.                     delete (*itr);  
    56.                     m_pThreadPool->m_TrashThread.erase(itr);  
    57.                     itr = m_pThreadPool->m_TrashThread.begin();  
    58.                 }  
    59.             }  
    60.             LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
    61.             continue;  
    62.         }  
    63.         else  
    64.         {  
    65.             pTask->Run();  
    66.             pTask = NULL;  
    67.         }  
    68.     }  
    69. }  
    70.   
    71. /////////////////////////////////////////////////////////////////////////////////////////////  
    72.   
    73. CThreadPoolExecutor::CThreadPoolExecutor(void) :   
    74. m_bRun(false),  
    75. m_bEnableInsertTask(false)  
    76. {  
    77.     InitializeCriticalSection(&m_csTasksLock);  
    78.     InitializeCriticalSection(&m_csThreadPoolLock);  
    79. }  
    80.   
    81. CThreadPoolExecutor::~CThreadPoolExecutor(void)  
    82. {  
    83.     Terminate();  
    84.     DeleteCriticalSection(&m_csTasksLock);  
    85.     DeleteCriticalSection(&m_csThreadPoolLock);  
    86. }  
    87.   
    88. bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks)  
    89. {  
    90.     if(minThreads == 0)  
    91.     {  
    92.         return false;  
    93.     }  
    94.     if(maxThreads < minThreads)  
    95.     {  
    96.         return false;  
    97.     }  
    98.     m_minThreads = minThreads;  
    99.     m_maxThreads = maxThreads;  
    100.     m_maxPendingTasks = maxPendingTasks;  
    101.     unsigned int i = m_ThreadPool.size();  
    102.     for(; i<minThreads; i++)  
    103.     {  
    104.         //创建线程  
    105.         CWorker * pWorker = new CWorker(this);  
    106.         if(NULL == pWorker)  
    107.         {  
    108.             return false;  
    109.         }  
    110.         EnterCriticalSection(&m_csThreadPoolLock);  
    111.         m_ThreadPool.insert(pWorker);  
    112.         LeaveCriticalSection(&m_csThreadPoolLock);  
    113.         pWorker->Start();  
    114.     }  
    115.     m_bRun = true;  
    116.     m_bEnableInsertTask = true;  
    117.     return true;  
    118. }  
    119.   
    120. bool CThreadPoolExecutor::Execute(Runnable * pRunnable)  
    121. {  
    122.     if(!m_bEnableInsertTask)  
    123.     {  
    124.         return false;  
    125.     }  
    126.     if(NULL == pRunnable)  
    127.     {  
    128.         return false;  
    129.     }  
    130.     if(m_Tasks.size() >= m_maxPendingTasks)  
    131.     {  
    132.         if(m_ThreadPool.size() < m_maxThreads)  
    133.         {  
    134.             CWorker * pWorker = new CWorker(this, pRunnable);  
    135.             if(NULL == pWorker)  
    136.             {  
    137.                 return false;  
    138.             }  
    139.             EnterCriticalSection(&m_csThreadPoolLock);  
    140.             m_ThreadPool.insert(pWorker);  
    141.             LeaveCriticalSection(&m_csThreadPoolLock);  
    142.             pWorker->Start();  
    143.         }  
    144.         else  
    145.         {  
    146.             return false;  
    147.         }  
    148.     }  
    149.     else  
    150.     {  
    151.         EnterCriticalSection(&m_csTasksLock);  
    152.         m_Tasks.push_back(pRunnable);  
    153.         LeaveCriticalSection(&m_csTasksLock);  
    154.     }  
    155.     return true;  
    156. }  
    157.   
    158. Runnable * CThreadPoolExecutor::GetTask()  
    159. {  
    160.     Runnable * Task = NULL;  
    161.     EnterCriticalSection(&m_csTasksLock);  
    162.     if(!m_Tasks.empty())  
    163.     {  
    164.         Task = m_Tasks.front();  
    165.         m_Tasks.pop_front();  
    166.     }  
    167.     LeaveCriticalSection(&m_csTasksLock);  
    168.     return Task;  
    169. }  
    170.   
    171. unsigned int CThreadPoolExecutor::GetThreadPoolSize()  
    172. {  
    173.     return m_ThreadPool.size();  
    174. }  
    175.   
    176. void CThreadPoolExecutor::Terminate()  
    177. {  
    178.     m_bEnableInsertTask = false;  
    179.     while(m_Tasks.size() > 0)  
    180.     {  
    181.         Sleep(1);  
    182.     }  
    183.     m_bRun = false;  
    184.     m_minThreads = 0;  
    185.     m_maxThreads = 0;  
    186.     m_maxPendingTasks = 0;  
    187.     while(m_ThreadPool.size() > 0)  
    188.     {  
    189.         Sleep(1);  
    190.     }  
    191.     EnterCriticalSection(&m_csThreadPoolLock);  
    192.     ThreadPoolItr itr = m_TrashThread.begin();  
    193.     while(itr != m_TrashThread.end())  
    194.     {  
    195.         (*itr)->Join();  
    196.         delete (*itr);  
    197.         m_TrashThread.erase(itr);  
    198.         itr = m_TrashThread.begin();  
    199.     }  
    200.     LeaveCriticalSection(&m_csThreadPoolLock);  
    201. }  

     

    用法:

    #include "Thread.h"
    #include "ThreadPoolExecutor.h"

    class R : public Runnable
    {
    public:
        ~R()
        {
        }
        void Run()
        {
            printf("Hello World/n");
        }
    };

    int _tmain(int argc, _TCHAR* argv[])
    {
        CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor();
        pExecutor->Init(1, 10, 50);
        R r;
        for(int i=0;i<100;i++)
        {
            while(!pExecutor->Execute(&r))
            {
            }
        }
        pExecutor->Terminate();
        delete pExecutor;
        getchar();
        return 0;
    }

     

    测试结果:

    机器:

    Intel(R) Core(TM)2 Duo CPU

    E8400 @ 3.00GHz

    2G内存

     

    对于100个任务并且每个任务包含10000000个循环,任务中无等待:

    单线程执行耗时:2281时间片

    单线程池执行耗时:2219时间片

    2个线程的线程池耗时:1156时间片

    5个线程的线程池耗时:1166时间片

    10个线程的线程池耗时:1157时间片

    100个线程的线程池耗时:1177时间片

     

    from:

  • 相关阅读:
    mysql密码忘记如何恢复(windows/liunx版本:mysql8.0.27)
    Visual Studio Code如何校验yaml格式文件
    python测试小工具
    Lunx vimgo 开发环境搭建
    小白学正则表达式之 regexp
    kubernetes scc 故障排查小记
    Go image registry
    OpenShift image registry 概述
    Go 疑难杂症汇总
    小白学标准库之 http
  • 原文地址:https://www.cnblogs.com/lidabo/p/3328646.html
Copyright © 2011-2022 走看看