zoukankan      html  css  js  c++  java
  • 一个简单的linux线程池(转-wangchenxicool)

    线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。

    在linux中,使用的是posix线程库,首先介绍几个常用的函数:

    1 线程的创建和取消函数

    pthread_create -- 创建线程

    pthread_join -- 合并线程

    pthread_cancel -- 取消线程

    2 线程同步函数

    pthread_mutex_lock

    pthread_mutex_unlock

    pthread_cond_signal

    pthread_cond_wait

    线程池的实现:

    线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

    主要有两个类来实现,CTask,CThreadPool

    /**
    执行任务的类,设置任务数据并执行
    **/

    C代码
    class CTask   
    {  
    protected:  
            string m_strTaskName;  //任务的名称  
            void* m_ptrData;  //要执行的任务的具体数据  
    public:  
            CTask(){}  
            CTask(string taskName)  
            {  
                    this->m_strTaskName = taskName;  
                    m_ptrData = NULL;  
            }  
    virtual int Run()= 0;  
            void SetData(void* data);    //设置任务数据  
    };
     

    任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。

    线程池类

    /**
    线程池
    **/

    Java代码
    class CThreadPool   
    {  
    private:  
            vector<CTask*> m_vecTaskList;  //任务列表  
            int m_iThreadNum;  //线程池中启动的线程数             
            static vector<pthread_t> m_vecIdleThread;  //当前空闲的线程集合  
            static vector<pthread_t> m_vecBusyThread;  //当前正在执行的线程集合  
            static pthread_mutex_t m_pthreadMutex;  //线程同步锁  
            static pthread_cond_t m_pthreadCond;  //线程同步的条件变量  
    protected:  
            static void* ThreadFunc(void * threadData); //新线程的线程函数  
            static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中  
            static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去  
            int Create(); //创建所有的线程  
    public:  
            CThreadPool(int threadNum);  
            int AddTask(CTask *task); //把任务添加到线程池中  
            int StopAll();  
    };
     

    当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

    线程之间的同步用线程锁和条件变量。

    这个类的对外接口有两个:

    AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

    StopAll函数停止所有的线程

    Cpp代码
     
    ************************************************   
      
    代码:   
      
    ××××××××××××××××××××CThread.h   
      
        
      
    #ifndef __CTHREAD   
    #define __CTHREAD   
    #include <vector>   
    #include <string>   
    #include <pthread.h>   
      
    using namespace std;   
      
    /**  
    执行任务的类,设置任务数据并执行  
    **/  
    class CTask   
    {   
    protected:   
            string m_strTaskName;  //任务的名称   
            void* m_ptrData;       //要执行的任务的具体数据   
    public:   
            CTask(){}   
            CTask(string taskName)   
            {   
                    this->m_strTaskName = taskName;   
                    m_ptrData = NULL;   
            }   
    virtual int Run()= 0;   
            void SetData(void* data);    //设置任务数据   
    };
       
      
    /**  
    线程池  
    **/  
    class CThreadPool   
    {   
    private:   
            vector<CTask*> m_vecTaskList;         //任务列表   
            int m_iThreadNum;                            //线程池中启动的线程数              
            static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合   
            static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合   
            static pthread_mutex_t m_pthreadMutex;    //线程同步锁   
            static pthread_cond_t m_pthreadCond;    //线程同步的条件变量   
    protected:   
            static void* ThreadFunc(void * threadData); //新线程的线程函数   
            static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中   
            static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去   
            int Create();          //创建所有的线程   
    public:   
            CThreadPool(int threadNum);   
            int AddTask(CTask *task);      //把任务添加到线程池中   
            int StopAll();   
    };   
      
    #endif   
      
     
    类的实现为:
    ××××××××××××××××××××CThread.cpp
    #include "CThread.h"
    #include <string>
    #include <iostream>
    using namespace std;
    void CTask::SetData(void * data)
    {
            m_ptrData = data;
    }
    vector<pthread_t> CThreadPool::m_vecBusyThread;
    vector<pthread_t> CThreadPool::m_vecIdleThread;
    pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
    CThreadPool::CThreadPool(int threadNum)
    {
            this->m_iThreadNum = threadNum;
            Create();
    }
     
    int CThreadPool::MoveToIdle(pthread_t tid)
    {
            vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
            while (busyIter != m_vecBusyThread.end())
            {
                    if(tid == *busyIter)
                    {
                            break;
                    }
                    busyIter++;
            }
            m_vecBusyThread.erase(busyIter);
            m_vecIdleThread.push_back(tid);
            return 0;
    }
     
    int CThreadPool::MoveToBusy(pthread_t tid)
    {
            vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
            while(idleIter != m_vecIdleThread.end())
            {
                    if(tid == *idleIter)
                    {
                            break;
                    }
                    idleIter++;
            }
            m_vecIdleThread.erase(idleIter);
            m_vecBusyThread.push_back(tid);
            return 0;
    }
     
    void* CThreadPool::ThreadFunc(void * threadData)
    {
            pthread_t tid = pthread_self();
            while (1)
            {
                    pthread_mutex_lock(&m_pthreadMutex);
                    pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
                    cout << "tid:" << tid << " run" << endl;
                    //get task
                    vector<CTask*>* taskList = (vector<CTask*>*)threadData;
                    vector<CTask*>::iterator iter = taskList->begin();
                    while (iter != taskList->end())
                    {
                            MoveToBusy(tid);
                            break;
                    }
                    CTask* task = *iter;
                    taskList->erase(iter);
                    pthread_mutex_unlock(&m_pthreadMutex);
                    cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
                    cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
                    //cout << "task to be run:" << taskList->size() << endl;
                    task->Run();
                    //cout << "CThread::thread work" << endl;
                    cout << "tid:" << tid << " idle" << endl;
            }
            return (void*)0;
    }
    int CThreadPool::AddTask(CTask *task)
    {
            this->m_vecTaskList.push_back(task);
            pthread_cond_signal(&m_pthreadCond);
            return 0;
    }
    int CThreadPool::Create()
    {
            for(int i = 0; i < m_iThreadNum; i++)
            {
                    pthread_t tid = 0;
                    pthread_create(&tid,NULL,ThreadFunc, &m_vecTaskList);
                    m_vecIdleThread.push_back(tid);
            }
            return 0;
    }
     
    int CThreadPool::StopAll()
    {
            vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
            while (iter != m_vecIdleThread.end())
            {
                    pthread_cancel(*iter);
                    pthread_join(*iter,NULL);
                    iter++;
            }
            iter = m_vecBusyThread.begin();
            while(iter != m_vecBusyThread.end())
            {
                    pthread_cancel(*iter);
                    pthread_join(*iter, NULL);
                    iter++;
            }
            return 0;
    }
    // 简单示例:
    ××××××××test.cpp
    #include "CThread.h"
    #include <iostream>
    using namespace std;
    class CWorkTask: public CTask
    {
    public:
            CWorkTask()
            {}
            int Run()
            {
                    cout << (char*)this->m_ptrData << endl;
                    sleep(10);
                    return 0;
            }
    };
     
    int main()
    {
            CWorkTask taskObj;
            char szTmp[] = "this is the first thread running,haha success";
            taskObj.SetData((void*)szTmp);
            CThreadPool threadPool(10);
            for(int i = 0; i < 11; i++)
            {
                    threadPool.AddTask(&taskObj);
            }
            while(1)
            {
                    sleep(120);
            }
            return 0;
    }
  • 相关阅读:
    获取文件扩展名的几个函数
    Window下,在TEMP路径下生成一个临时文件名
    字符串中的TOUPPER函数
    字符串中的TRIM操作
    BIN转换成HEX格式及HEX转换成BIN的两个函数接口
    GMSSL在Window下的编译
    VS2012下自定义打开文件对话框
    Daliy Algorithm (graph,思维)-- day 59
    Daliy Algorithm (贪心,gcd)-- day 58
    图论--Floyd(弗洛伊德)算法
  • 原文地址:https://www.cnblogs.com/sanchrist/p/3573429.html
Copyright © 2011-2022 走看看