zoukankan      html  css  js  c++  java
  • 转载:C++线程池的一个实现

    原文转自:http://www.cnblogs.com/lidabo/p/3328646.html

    略有修改

    Cthread类参见:http://www.cnblogs.com/tangxin-blog/p/4835211.html

    CThreadPool.h

     1 #ifndef __MY_THREAD_POOL_H_
     2 #define __MY_THREAD_POOL_H_
     3 
     4 #include "CThread.h"
     5 #include <set>
     6 #include <list>
     7 #include <windows.h>
     8 using namespace std;
     9 
    10 class CThreadPool
    11 {
    12 public:
    13     CThreadPool(void);
    14     virtual ~CThreadPool(void);
    15     // 初始化线程池,创建minThreads个线程 
    16     bool Initialize(unsigned int minThreadCnt,unsigned int maxThreadCnt,unsigned int maxTaskQueueLength);
    17     bool AddTask( CRunnable *pRunnable, bool bRun = true);
    18     void Terminate();
    19     // 获取线程数量
    20     unsigned int GetThreadCnt();
    21 private:
    22     // 从任务队列头中取出一个任务
    23     CRunnable *GetTask();
    24     // 执行任务线程
    25     static unsigned int WINAPI StaticThreadFunc(void * arg);
    26 private:
    27     // 工作者类
    28     class CWorker : public CThread
    29     {
    30     public:  
    31         CWorker(CThreadPool *pThreadPool,CRunnable *pFirstTask = NULL);
    32         ~CWorker();  
    33         void Run();  
    34     private:
    35         CThreadPool * const m_pThreadPool;
    36         CRunnable * const m_pFirstTask;
    37         volatile bool m_bRun;  
    38     };
    39 
    40     typedef std::set<CWorker *> ThreadPool;  
    41     typedef std::list<CRunnable *> Tasks;  
    42     typedef Tasks::iterator TasksItr;  
    43     typedef ThreadPool::iterator ThreadPoolItr;
    44 
    45     CRITICAL_SECTION m_csTasksLock;  
    46     CRITICAL_SECTION m_csThreadPoolLock;  
    47 
    48     // 线程池
    49     ThreadPool m_ThreadPool;  
    50     // 垃圾线程
    51     ThreadPool m_TrashThread;  
    52     // 任务队列
    53     Tasks m_Tasks;
    54     // 是否在运行
    55     volatile bool m_bRun;  
    56     // 能否插入任务
    57     volatile bool m_bEnableInsertTask;  
    58     // 最小线程数
    59     volatile unsigned int m_minThreads;  
    60     // 最大线程数
    61     volatile unsigned int m_maxThreads;  
    62     // 最大挂起任务数量
    63     volatile unsigned int m_maxPendingTasks;  
    64 };
    65 
    66 #endif

    CthreadPool.cpp

    #include "CThreadPool.h"
    
    CThreadPool::CWorker::CWorker(CThreadPool *pThreadPool,CRunnable *pFirstTask)
        :m_pThreadPool(pThreadPool),m_pFirstTask(pFirstTask),m_bRun(true)
    {
    
    }
    
    CThreadPool::CWorker::~CWorker()
    {
    }
    
    void CThreadPool::CWorker::Run()
    {
        CRunnable * pTask = NULL;
        while(m_bRun)
        {
            // 从线程池的任务队列中取出一个任务
            pTask = m_pThreadPool->GetTask();
            // 如果没有取到任务
            if(NULL == pTask)
            {
                EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
                // 如果运转的线程数大于最小线程数,需要清除多余的线程
                if(m_pThreadPool->GetThreadCnt() > m_pThreadPool->m_minThreads)  
                {  
                    ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);  
                    if(itr != m_pThreadPool->m_ThreadPool.end())  
                    {  
                        m_pThreadPool->m_ThreadPool.erase(itr);  
                        m_pThreadPool->m_TrashThread.insert(this);  
                    }  
                    m_bRun = false;  
                }  
                else  
                {  
                    // 等待已经开始运行的线程结束
                    ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();  
                    while(itr != m_pThreadPool->m_TrashThread.end())  
                    {  
                        (*itr)->Join();  
                        delete (*itr);  
                        m_pThreadPool->m_TrashThread.erase(itr);  
                        itr = m_pThreadPool->m_TrashThread.begin();  
                    }  
                }  
                LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));  
                continue;  
            }
            else  
            {  
                pTask->Run();  
                pTask = NULL;  
            }  
        }
    }
    
    
    CThreadPool::CThreadPool(void):m_bRun(false),m_bEnableInsertTask(false)  
    {
        InitializeCriticalSection(&m_csTasksLock);  
        InitializeCriticalSection(&m_csThreadPoolLock);  
    }
    
    CThreadPool::~CThreadPool(void)
    {
        Terminate();  
        DeleteCriticalSection(&m_csTasksLock);  
        DeleteCriticalSection(&m_csThreadPoolLock);  
    }
    
    bool CThreadPool::Initialize(unsigned int minThreadCnt,unsigned int maxThreadCnt,unsigned int maxTaskQueueLength)
    {
        if(minThreadCnt == 0)  
        {  
            return false;  
        }  
        if(minThreadCnt > maxThreadCnt)  
        {  
            return false;  
        }  
        m_minThreads = minThreadCnt;  
        m_maxThreads = maxThreadCnt;  
        m_maxPendingTasks = maxTaskQueueLength;
        unsigned int i = m_ThreadPool.size();
        for(; i<minThreadCnt; i++)  
        {  
            //创建线程 minThreadCnt 个线程
            CWorker * pWorker = new CWorker(this);  
            if(NULL == pWorker)  
            {  
                return false;  
            }  
            EnterCriticalSection(&m_csThreadPoolLock);
            m_ThreadPool.insert(pWorker);
            LeaveCriticalSection(&m_csThreadPoolLock);
            pWorker->Start();
        }
        // 可以开始插入任务队列
        m_bRun = true;
        m_bEnableInsertTask = true;
        return true;
    }
    
    unsigned int CThreadPool::GetThreadCnt()
    {
        return m_ThreadPool.size();
    }
    
    CRunnable * CThreadPool::GetTask()
    {
        CRunnable *Task = NULL;  
        EnterCriticalSection(&m_csTasksLock);  
        if(!m_Tasks.empty())  
        {  
            Task = m_Tasks.front();  
            m_Tasks.pop_front();  
        }  
        LeaveCriticalSection(&m_csTasksLock);  
        return Task;  
    }
    
    bool CThreadPool::AddTask( CRunnable *pRunnable, bool bRun /*= true*/ )
    {
        if(m_bEnableInsertTask == false)
        {
            return false;
        }
        if(NULL == pRunnable)
        {
            return false;
        }
        // 如果达到最大挂起任务数量,不再插入
        if(m_Tasks.size() >= m_maxPendingTasks)  
        {  
            // 如果小于最大线程数
            if(m_ThreadPool.size() < m_maxThreads)  
            {  
                CWorker * pWorker = new CWorker(this, pRunnable);  
                if(NULL == pWorker)  
                {  
                    return false;  
                }  
                EnterCriticalSection(&m_csThreadPoolLock);  
                m_ThreadPool.insert(pWorker);  
                LeaveCriticalSection(&m_csThreadPoolLock);  
                pWorker->Start();  
            }  
            else  
            {  
                return false;  
            }  
        }  
        else  
        {  
            EnterCriticalSection(&m_csTasksLock);  
            m_Tasks.push_back(pRunnable);  
            LeaveCriticalSection(&m_csTasksLock);  
        }  
        return true;  
    }
    
    void CThreadPool::Terminate()
    {
        m_bEnableInsertTask = false;  
        while(m_Tasks.size() > 0)  
        {  
            Sleep(1);  
        }
        m_bRun = false;  
        m_minThreads = 0;  
        m_maxThreads = 0;  
        m_maxPendingTasks = 0;  
        while(m_ThreadPool.size() > 0)  
        {  
            Sleep(1);  
        }
        EnterCriticalSection(&m_csThreadPoolLock);  
        ThreadPoolItr itr = m_TrashThread.begin();  
        while(itr != m_TrashThread.end())  
        {  
            (*itr)->Join();  
            delete (*itr);  
            m_TrashThread.erase(itr);  
            itr = m_TrashThread.begin();  
        }  
        LeaveCriticalSection(&m_csThreadPoolLock);
    }

    测试代码

    #include <iostream>
    #include <windows.h>
    #include <time.h>
    #include "CThread.h"
    #include "CThreadPool.h"
    using namespace std;
    
    class R : public CRunnable
    {
    public:
        R(int t):m_nt(t)
        {
        }
        ~R()
        {
            cout<<"~R:"<<m_nt<<endl;
        }
        void Run()
        {
            Sleep(m_nt);
        }
        int m_nt;
    };
    
    int main()
    {
        int i,n = 100000,m;
        time_t start = 0,end = 0;
        R r(1);
        /*
        // 单线程
        start = clock();
        for(i=0;i < n;i++)
        {
            r.Run();
        }
        end = clock();
        cout<<"单线程用时:"<<end - start<<endl;
        */
        // 多线程
        start = clock();
        CThread *ths = NULL;
        ths = new CThread[n];
        for(i=0;i < n;i++)
        {
            ths[i].SetRunnable(&r);
            ths[i].Start();
        }
        for(i=0;i < n;i++)
        {
            ths[i].Join();
        }
        delete[] ths;
        end = clock();
        cout<<"多线程用时:"<<end - start<<endl;
        // 线程池
        start = clock();
        CThreadPool *threadPool = new CThreadPool();
        if(threadPool->Initialize(200,500,100) == false)
        {
            cout<<"Initialize failed"<<endl;
            return -1;
        }
        m = 0;
        for(int i=0;i<n;i++)
        {
            if( !threadPool->AddTask(&r) )
            {
                m++;
            }
        }
        threadPool->Terminate();
        delete threadPool;
        end = clock();
        if(m!=0)
        {
            cout<<m<<endl;
        }
        cout<<"线程池用时:"<<end - start<<endl;
        system("pause");
        return 0;
    }

    注意:要合理设置最小线程数,最大线程数,最大挂起任务数量,以便达到最优性能。

  • 相关阅读:
    高级同步器:交换器Exchanger
    高级同步器:同步屏障CyclicBarrier
    转:java中的定时任务
    spring事务配置,声明式事务管理和基于@Transactional注解的使用(转)
    深入研究Java类加载机制
    接口可以继承多个接口总结
    序列化和反序列化(转)
    tomcat常见错误及解决方案
    数据库连接池的实现及原理
    proxy是什么
  • 原文地址:https://www.cnblogs.com/tangxin-blog/p/4856329.html
Copyright © 2011-2022 走看看