zoukankan      html  css  js  c++  java
  • Linux下通用线程池的创建与使用

    线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。

    在linux中,使用的是posix线程库,首先介绍几个常用的函数:

    1 线程的创建和取消函数

    pthread_create

    创建线程

    pthread_join

    合并线程

    pthread_cancel

    取消线程

    2 线程同步函数

    pthread_mutex_lock

    pthread_mutex_unlock

    pthread_cond_signal

    pthread_cond_wait

    关于函数的详细说明,参考man手册

    线程池的实现:

    线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。

    主要有两个类来实现,CTask,CThreadPool

    /**
    执行任务的类,设置任务数据并执行
    **/
    class CTask
    {
    protected:
     string m_strTaskName;  //任务的名称
     void* m_ptrData;       //要执行的任务的具体数据
    public:
     CTask(){}
     CTask(string taskName)
     {
      this->m_strTaskName = taskName;
      m_ptrData = NULL;
     }
     virtual int Run()= 0;
     void SetData(void* data);    //设置任务数据
    };

    任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。

    线程池类

    /**
    线程池
    **/
    class CThreadPool
    {
    private:
     vector<CTask*> m_vecTaskList;         //任务列表
     int m_iThreadNum;                            //线程池中启动的线程数          
     static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
     static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
     static pthread_mutex_t m_pthreadMutex;    //线程同步锁
     static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
    protected:
     static void* ThreadFunc(void * threadData); //新线程的线程函数
     static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
     static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
     int Create();          //创建所有的线程
    public:
     CThreadPool(int threadNum);
     int AddTask(CTask *task);      //把任务添加到线程池中
     int StopAll();
    };

    当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。

    线程之间的同步用线程锁和条件变量。

    这个类的对外接口有两个:

    AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。

    StopAll函数停止所有的线程

    ************************************************

    代码:

    ××××××××××××××××××××CThread.h

    #ifndef __CTHREAD
    #define __CTHREAD
    #include <vector>
    #include <string>
    #include <pthread.h>

    using namespace std;

    /**
    执行任务的类,设置任务数据并执行
    **/
    class CTask
    {
    protected:
     string m_strTaskName;  //任务的名称
     void* m_ptrData;       //要执行的任务的具体数据
    public:
     CTask(){}
     CTask(string taskName)
     {
      this->m_strTaskName = taskName;
      m_ptrData = NULL;
     }
     virtual int Run()= 0;
     void SetData(void* data);    //设置任务数据
    };

    /**
    线程池
    **/
    class CThreadPool
    {
    private:
     vector<CTask*> m_vecTaskList;         //任务列表
     int m_iThreadNum;                            //线程池中启动的线程数          
     static vector<pthread_t> m_vecIdleThread;   //当前空闲的线程集合
     static vector<pthread_t> m_vecBusyThread;   //当前正在执行的线程集合
     static pthread_mutex_t m_pthreadMutex;    //线程同步锁
     static pthread_cond_t m_pthreadCond;    //线程同步的条件变量
    protected:
     static void* ThreadFunc(void * threadData); //新线程的线程函数
     static int MoveToIdle(pthread_t tid);   //线程执行结束后,把自己放入到空闲线程中
     static int MoveToBusy(pthread_t tid);   //移入到忙碌线程中去
     int Create();          //创建所有的线程
    public:
     CThreadPool(int threadNum);
     int AddTask(CTask *task);      //把任务添加到线程池中
     int StopAll();
    };

    #endif

    类的实现为:

    ××××××××××××××××××××CThread.cpp

    #include "CThread.h"
    #include <string>
    #include <iostream>

    using namespace std;

    void CTask::SetData(void * data)
    {
     m_ptrData = data;
    }

    vector<pthread_t> CThreadPool::m_vecBusyThread;
    vector<pthread_t> CThreadPool::m_vecIdleThread;
    pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

    CThreadPool::CThreadPool(int threadNum)
    {
     this->m_iThreadNum = threadNum;
     Create();
    }
    int CThreadPool::MoveToIdle(pthread_t tid)
    {
     vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin();
     while(busyIter != m_vecBusyThread.end())
     {
      if(tid == *busyIter)
      {
       break;
      }
      busyIter++;
     }
     m_vecBusyThread.erase(busyIter);
     m_vecIdleThread.push_back(tid);
     return 0;
    }

    int CThreadPool::MoveToBusy(pthread_t tid)
    {
     vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin();
     while(idleIter != m_vecIdleThread.end())
     {
      if(tid == *idleIter)
      {
       break;
      }
      idleIter++;
     }
     m_vecIdleThread.erase(idleIter);
     m_vecBusyThread.push_back(tid);
     return 0;
    }
    void* CThreadPool::ThreadFunc(void * threadData)
    {
     pthread_t tid = pthread_self();
     while(1)
     {
      pthread_mutex_lock(&m_pthreadMutex);
      pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex);
      cout << "tid:" << tid << " run" << endl;
      //get task
      vector<CTask*>* taskList = (vector<CTask*>*)threadData;
      vector<CTask*>::iterator iter = taskList->begin();
      while(iter != taskList->end())
      {
       
       MoveToBusy(tid);
       break;
      }
      CTask* task = *iter;
      taskList->erase(iter);
      pthread_mutex_unlock(&m_pthreadMutex);
      cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl;
      cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl;
      //cout << "task to be run:" << taskList->size() << endl;
      task->Run();
      
      //cout << "CThread::thread work" << endl;
      cout << "tid:" << tid << " idle" << endl;
      
     }
     return (void*)0;
    }

    int CThreadPool::AddTask(CTask *task)
    {
     this->m_vecTaskList.push_back(task);
     pthread_cond_signal(&m_pthreadCond);
     return 0;
    }
    int CThreadPool::Create()
    {
     for(int i = 0; i < m_iThreadNum;i++)
     {
      pthread_t tid = 0;
      pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList);
      m_vecIdleThread.push_back(tid);
     }
     return 0;
    }

    int CThreadPool::StopAll()
    {
     vector<pthread_t>::iterator iter = m_vecIdleThread.begin();
     while(iter != m_vecIdleThread.end())
     {
      pthread_cancel(*iter);
      pthread_join(*iter,NULL);
      iter++;
     }

     iter = m_vecBusyThread.begin();
     while(iter != m_vecBusyThread.end())
     {
      pthread_cancel(*iter);
      pthread_join(*iter,NULL);
      iter++;
     }
     
     return 0;
    }

    简单示例:

    ××××××××test.cpp

    #include "CThread.h"
    #include <iostream>

    using namespace std;

    class CWorkTask: public CTask
    {
    public:
     CWorkTask()
     {}
     int Run()
     {
      cout << (char*)this->m_ptrData << endl;
      sleep(10);
      return 0;
     }
    };
    int main()
    {
     CWorkTask taskObj;
     char szTmp[] = "this is the first thread running,haha success";
     taskObj.SetData((void*)szTmp);
     CThreadPool threadPool(10);
     for(int i = 0;i < 11;i++)
     {
      threadPool.AddTask(&taskObj);
     }
     while(1)
     {
      sleep(120);
     }
     return 0;
    }

  • 相关阅读:
    给最小化托盘增加右键菜单
    (转)c#实现开机自启动
    Socket代码
    (转)C# Socket简单例子(服务器与客户端通信)
    (转)C# Socket异步通信
    (转)winform pictureBox后台显示图片
    验证DataGridView单元格的值
    批处理判断是否有.net环境
    Winform判断是否已启动
    linux 下 apache启动、停止、重启命令
  • 原文地址:https://www.cnblogs.com/skyofbitbit/p/3688917.html
Copyright © 2011-2022 走看看