线程池: 就是new一堆线程,当有任务到来时,抓一个线程去执行,执行完之后再丢回线程池。
省去了新建和注销线程的开销。
一、线程池工作分为以下几步:
(1)创建线程固定数目的线程(如:20个),并让线程挂起等待任务
(2)给某个线程设置任务
(3)激活该线程,让其执行任务
(4)线程执行任务完毕后,回收该线程
二、主要是依靠三对PV操作
线程池线程 |
任务线程 |
如上面结构所示:
①状态时,是线程池线程创建任务线程成功,任务线程在③处挂起;
②状态时,线程池线程已经运行了一个用于激活任务的V、P操作;任务线程也会运行到④,执行任务;
这里面,m_Cond_Run的目的是线程池告诉任务线程,需要启动;m_Cond_IsTaskRun是任务线程回复线程池线程说,任务正常启动;
⑤所在的地方是线程池发送了结束任务线程的指令,用P操作来等待任务线程结束;
m_Cond_IsRunning目的是任务线程,在将任务暂停并回收资源成功后,需要告诉线程池线程可以回收我了。
三、虽然过程不复杂,但是实现过程中的逻辑却不简单,以darwin的源码为例:
类结构如下:(伪代码)
class ThreadTask //是一个执行任务的线程类 { friend class ThreadPool; public: Int start(); //创建线程 static void _Entry(void *inThread); //该线程的回调函数 virtual void Entry(); Bool Active(); //激活该线程 private: Task *m_task; OSCond m_Cond_Run; //用于检测线程是否运行的条件变量 OSMutex m_Mutex_Run; //相对应的互斥量 OSCond m_Cond_IsTaskRun; //用于检测是否运行任务的条件变量 OSMutex m_Mutex_IsTaskRun; //相对应的互斥量 }; class Task { friend class ThreadTask; public: Bool RunTask(){ ThreadPool::RegistTask(this);} virtual int Run()=0; //最终执行任务的函数 }; class ThreadPool { friend class ThreadTask; public: ThreadPool(int n); //初始化 static void RegistTask(Task* ptask); //注册任务 private: List<ThreadTask*> m_useList; List<ThreadTask*> m_unUseList; };
类的实现如下:(伪代码)
class MyTask :public Task { int Run(){...}; //代码中,只要实现Run()函数,就可以了。 }; int main() { MyTask *mytask=new MyTask(); mytask->RunTask(); //调用RunTask()就可以是任务运行,但是一般通常把这个函数封装起来而不这样用 return 0; } //取一个线程执行任务 void ThreadPool::RegistTask(Task* ptask) { if(m_unUseList.empty()) { //如果没有空闲的线程,那么就创建一个线程 ThreadTask *pthread = new ThreadTask(); pthread->start(); m_useList.Push(pthread); } ThreadTask *pthread = m_unUseList.Pop(); p->m_task=ptask;//给线程添加任务 pthread->Active();//激活线程 } //初始化n个线程,并激活 void ThreadPool::ThreadPool(int n) { while(n--) { ThreadTask *pthread = new ThreadTask(); pthread->start(); m_useList.Push(pthread); } } //创建线程 Int ThreadTask::start() { //可以看出线程的回调函数是_Entry实际上是一个静态函数,目的是为了调用各自进程的Entry()函数 return pthread_create((pthread_t*)&m_ThreadID, &m_ThreadAttr, _Entry, (void*)this); } void ThreadTask::_Entry(void *inThread) { //调用当前进程对应的Entry()函数 inThread->Entry(); } void ThreadTask::Entry() { while (1) { //这一段相当于P(m_Cond_Run) m_Mutex_Run.Lock(); while(!m_busy) { m_Cond_Run.Wait(&m_Mutex_Run, 500); } m_Mutex_Run.Unlock(); //这一段相当于V(m_Cond_IsTaskRun) m_Mutex_IsTaskRun.Lock(); m_IsTaskRun = TRUE; m_Cond_IsTaskRun.Signal(); m_Mutex_IsTaskRun.Unlock(); //在这里执行Run()之后再把线程放回线程池的未用队列 if(m_Task != NULL) { m_Task->Run(); delete m_task; //在这一部分还有一段用于确认是否运行完毕的PV操作,暂且省略 //这个函数主要是把当前线程从已用队列挪到未用队列 ThreadPool::ReclaimThread(this); } //修改状态 m_Mutex_IsTaskRun.Lock(); m_IsTaskRun = FALSE; m_Mutex_IsTaskRun.Unlock(); } return NULL; } Bool ThreadTask::Active() { m_Mutex_Run.Lock(); if(!m_busy) { //这一段相当于V(m_Cond_Run) m_busy = TRUE; m_Cond_Run.Signal(); m_Mutex_Run.Unlock(); //这一段相当于P(m_Cond_IsTaskRun) m_Mutex_IsTaskRun.Lock(); while(!m_IsTaskRun) { m_Cond_IsTaskRun.Wait(&m_Mutex_IsTaskRun,100); } m_Mutex_IsTaskRun.Unlock(); return TRUE; } m_Mutex_Run.Unlock(); return TRUE; }