zoukankan      html  css  js  c++  java
  • 一个简单的linux线程池(转-wangchenxicool)

    线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。

    在linux中,使用的是posix线程库,首先介绍几个常用的函数:

    1 线程的创建和取消函数

    pthread_create -- 创建线程

    pthread_join -- 合并线程

    pthread_cancel -- 取消线程

    2 线程同步函数

    pthread_mutex_lock

    pthread_mutex_unlock

    pthread_cond_signal

    pthread_cond_wait

    线程池的实现:

    线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

    主要有两个类来实现,CTask,CThreadPool

    /**
    执行任务的类,设置任务数据并执行
    **/

    C代码
    class CTask   
    {  
    protected:  
            string m_strTaskName;  //任务的名称  
            void* m_ptrData;  //要执行的任务的具体数据  
    public:  
            CTask(){}  
            CTask(string taskName)  
            {  
                    this->m_strTaskName = taskName;  
                    m_ptrData = NULL;  
            }  
    virtual int Run()= 0;  
            void SetData(void* data);    //设置任务数据  
    };
     

    任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。

    线程池类

    /**
    线程池
    **/

    Java代码
    class CThreadPool   
    {  
    private:  
            vector<CTask*> m_vecTaskList;  //任务列表  
            int m_iThreadNum;  //线程池中启动的线程数             
            static vector<pthread_t> m_vecIdleThread;  //当前空闲的线程集合  
            static vector<pthread_t> m_vecBusyThread;  //当前正在执行的线程集合  
            static pthread_mutex_t m_pthreadMutex;  //线程同步锁  
            static pthread_cond_t m_pthreadCond;  //线程同步的条件变量  
    protected:  
            static void* ThreadFunc(void * threadData); //新线程的线程函数  
            static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中  
            static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去  
            int Create(); //创建所有的线程  
    public:  
            CThreadPool(int threadNum);  
            int AddTask(CTask *task); //把任务添加到线程池中  
            int StopAll();  
    };
     

    当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

    线程之间的同步用线程锁和条件变量。

    这个类的对外接口有两个:

    AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

    StopAll函数停止所有的线程

    Cpp代码
     
    ************************************************   
      
    代码:   
      
    ××××××××××××××××××××CThread.h   
      
        
      
    #ifndef __CTHREAD   
    #define __CTHREAD   
    #include <vector>   
    #include <string>   
    #include <pthread.h>   
      
    using namespace std;   
      
    /**  
    执行任务的类,设置任务数据并执行  
    **/  
    class CTask   
    {   
    protected:   
            string m_strTaskName;  //任务的名称   
            void* m_ptrData;       //要执行的任务的具体数据   
    public:   
            CTask(){}   
            CTask(string taskName)   
            {   
                    this->m_strTaskName = taskName;   
                    m_ptrData = NULL;   
            }   
    virtual int Run()= 0;   
            void SetData(void* data);    //设置任务数据   
    };
       
      
    /**  
    线程池  
    **/  
    class CThreadPool   
    {   
    private:   
            vector<CTask*> m_vecTaskList;         //任务列表   
            int m_iThreadNum;                            //线程池中启动的线程数              
            static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合   
            static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合   
            static pthread_mutex_t m_pthreadMutex;    //线程同步锁   
            static pthread_cond_t m_pthreadCond;    //线程同步的条件变量   
    protected:   
            static void* ThreadFunc(void * threadData); //新线程的线程函数   
            static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中   
            static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去   
            int Create();          //创建所有的线程   
    public:   
            CThreadPool(int threadNum);   
            int AddTask(CTask *task);      //把任务添加到线程池中   
            int StopAll();   
    };   
      
    #endif   
      
     
    类的实现为:
    ××××××××××××××××××××CThread.cpp
    #include "CThread.h"
    #include <string>
    #include <iostream>
    using namespace std;
    void CTask::SetData(void * data)
    {
            m_ptrData = data;
    }
    vector<pthread_t> CThreadPool::m_vecBusyThread;
    vector<pthread_t> CThreadPool::m_vecIdleThread;
    pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
    CThreadPool::CThreadPool(int threadNum)
    {
            this->m_iThreadNum = threadNum;
            Create();
    }
     
    int CThreadPool::MoveToIdle(pthread_t tid)
    {
            vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
            while (busyIter != m_vecBusyThread.end())
            {
                    if(tid == *busyIter)
                    {
                            break;
                    }
                    busyIter++;
            }
            m_vecBusyThread.erase(busyIter);
            m_vecIdleThread.push_back(tid);
            return 0;
    }
     
    int CThreadPool::MoveToBusy(pthread_t tid)
    {
            vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
            while(idleIter != m_vecIdleThread.end())
            {
                    if(tid == *idleIter)
                    {
                            break;
                    }
                    idleIter++;
            }
            m_vecIdleThread.erase(idleIter);
            m_vecBusyThread.push_back(tid);
            return 0;
    }
     
    void* CThreadPool::ThreadFunc(void * threadData)
    {
            pthread_t tid = pthread_self();
            while (1)
            {
                    pthread_mutex_lock(&m_pthreadMutex);
                    pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
                    cout << "tid:" << tid << " run" << endl;
                    //get task
                    vector<CTask*>* taskList = (vector<CTask*>*)threadData;
                    vector<CTask*>::iterator iter = taskList->begin();
                    while (iter != taskList->end())
                    {
                            MoveToBusy(tid);
                            break;
                    }
                    CTask* task = *iter;
                    taskList->erase(iter);
                    pthread_mutex_unlock(&m_pthreadMutex);
                    cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
                    cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
                    //cout << "task to be run:" << taskList->size() << endl;
                    task->Run();
                    //cout << "CThread::thread work" << endl;
                    cout << "tid:" << tid << " idle" << endl;
            }
            return (void*)0;
    }
    int CThreadPool::AddTask(CTask *task)
    {
            this->m_vecTaskList.push_back(task);
            pthread_cond_signal(&m_pthreadCond);
            return 0;
    }
    int CThreadPool::Create()
    {
            for(int i = 0; i < m_iThreadNum; i++)
            {
                    pthread_t tid = 0;
                    pthread_create(&tid,NULL,ThreadFunc, &m_vecTaskList);
                    m_vecIdleThread.push_back(tid);
            }
            return 0;
    }
     
    int CThreadPool::StopAll()
    {
            vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
            while (iter != m_vecIdleThread.end())
            {
                    pthread_cancel(*iter);
                    pthread_join(*iter,NULL);
                    iter++;
            }
            iter = m_vecBusyThread.begin();
            while(iter != m_vecBusyThread.end())
            {
                    pthread_cancel(*iter);
                    pthread_join(*iter, NULL);
                    iter++;
            }
            return 0;
    }
    // 简单示例:
    ××××××××test.cpp
    #include "CThread.h"
    #include <iostream>
    using namespace std;
    class CWorkTask: public CTask
    {
    public:
            CWorkTask()
            {}
            int Run()
            {
                    cout << (char*)this->m_ptrData << endl;
                    sleep(10);
                    return 0;
            }
    };
     
    int main()
    {
            CWorkTask taskObj;
            char szTmp[] = "this is the first thread running,haha success";
            taskObj.SetData((void*)szTmp);
            CThreadPool threadPool(10);
            for(int i = 0; i < 11; i++)
            {
                    threadPool.AddTask(&taskObj);
            }
            while(1)
            {
                    sleep(120);
            }
            return 0;
    }
  • 相关阅读:
    POJ 1469 COURSES 二分图最大匹配
    POJ 1325 Machine Schedule 二分图最大匹配
    USACO Humble Numbers DP?
    SGU 194 Reactor Cooling 带容量上下限制的网络流
    POJ 3084 Panic Room 求最小割
    ZOJ 2587 Unique Attack 判断最小割是否唯一
    Poj 1815 Friendship 枚举+求最小割
    POJ 3308 Paratroopers 最小点权覆盖 求最小割
    1227. Rally Championship
    Etaoin Shrdlu
  • 原文地址:https://www.cnblogs.com/sanchrist/p/3573429.html
Copyright © 2011-2022 走看看