1 #include <vector> 2 3 class CTask 4 { 5 protected: 6 string m_strTaskName; //任务的名称 7 void* m_ptrData; //要执行的任务的具体数据 8 public: 9 CTask(){} 10 CTask(string taskName) 11 { 12 this->m_strTaskName = taskName; 13 m_ptrData = NULL; 14 } 15 virtual int Run()= 0; 16 void SetData(void* data); //设置任务数据 17 }; 18 19 class CThreadPool 20 { 21 private: 22 vector<CTask*> m_vecTaskList; //任务列表 23 int m_iThreadNum; //线程池中启动的线程数 24 static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合 25 static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合 26 static pthread_mutex_t m_pthreadMutex; //线程同步锁 27 static pthread_cond_t m_pthreadCond; //线程同步的条件变量 28 protected: 29 static void* ThreadFunc(void * threadData); //新线程的线程函数 30 static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中 31 static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去 32 int Create(); //创建所有的线程 33 public: 34 CThreadPool(int threadNum); 35 int AddTask(CTask *task); //把任务添加到线程池中 36 int StopAll(); 37 };
1 #include <string> 2 #include <iostream> 3 4 using namespace std; 5 6 #include "thread_pool.h" 7 void CTask::SetData(void * data) 8 { 9 m_ptrData = data; 10 } 11 12 vector<pthread_t> CThreadPool::m_vecBusyThread; 13 vector<pthread_t> CThreadPool::m_vecIdleThread; 14 pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; 15 pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; 16 17 CThreadPool::CThreadPool(int threadNum) 18 { 19 this->m_iThreadNum = threadNum; 20 Create(); 21 } 22 int CThreadPool::MoveToIdle(pthread_t tid) 23 { 24 vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin(); 25 while(busyIter != m_vecBusyThread.end()) 26 { 27 if(tid == *busyIter) 28 { 29 break; 30 } 31 busyIter++; 32 } 33 m_vecBusyThread.erase(busyIter); 34 m_vecIdleThread.push_back(tid); 35 return 0; 36 } 37 38 int CThreadPool::MoveToBusy(pthread_t tid) 39 { 40 vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin(); 41 while(idleIter != m_vecIdleThread.end()) 42 { 43 if(tid == *idleIter) 44 { 45 break; 46 } 47 idleIter++; 48 } 49 m_vecIdleThread.erase(idleIter); 50 m_vecBusyThread.push_back(tid); 51 return 0; 52 } 53 void* CThreadPool::ThreadFunc(void * threadData) 54 { 55 pthread_t tid = pthread_self(); 56 while(1) 57 { 58 pthread_mutex_lock(&m_pthreadMutex); 59 pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex); 60 cout << "tid:" << tid << " run" << endl; 61 //get task 62 vector<CTask*>* taskList = (vector<CTask*>*)threadData; 63 vector<CTask*>::iterator iter = taskList->begin(); 64 while(iter != taskList->end()) 65 { 66 67 MoveToBusy(tid); 68 break; 69 } 70 CTask* task = *iter; 71 taskList->erase(iter); 72 pthread_mutex_unlock(&m_pthreadMutex); 73 cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl; 74 cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl; 75 //cout << "task to be run:" << taskList->size() << endl; 76 task->Run(); 77 78 //cout << "CThread::thread work" << endl; 79 cout << "tid:" << tid << " idle" << endl; 80 81 } 82 return (void*)0; 83 } 84 85 int CThreadPool::AddTask(CTask *task) 86 { 87 this->m_vecTaskList.push_back(task); 88 pthread_cond_signal(&m_pthreadCond); 89 return 0; 90 } 91 int CThreadPool::Create() 92 { 93 for(int i = 0; i < m_iThreadNum;i++) 94 { 95 pthread_t tid = 0; 96 pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList); 97 m_vecIdleThread.push_back(tid); 98 } 99 return 0; 100 } 101 102 int CThreadPool::StopAll() 103 { 104 vector<pthread_t>::iterator iter = m_vecIdleThread.begin(); 105 while(iter != m_vecIdleThread.end()) 106 { 107 pthread_cancel(*iter); 108 pthread_join(*iter,NULL); 109 iter++; 110 } 111 112 iter = m_vecBusyThread.begin(); 113 while(iter != m_vecBusyThread.end()) 114 { 115 pthread_cancel(*iter); 116 pthread_join(*iter,NULL); 117 iter++; 118 } 119 120 return 0; 121 }
1 #include <string> 2 #include <iostream> 3 using namespace std; 4 #include "thread_pool.h" 5 #include <unistd.h> 6 7 class CWorkTask: public CTask 8 { 9 public: 10 CWorkTask() 11 {} 12 int Run() 13 { 14 cout << (char*)this->m_ptrData << endl; 15 sleep(10); 16 return 0; 17 } 18 }; 19 20 int main() 21 { 22 CWorkTask taskObj; 23 char szTmp[] = "this is the first thread running,haha success"; 24 taskObj.SetData((void*)szTmp); 25 CThreadPool threadPool(10); 26 sleep(1); 27 for(int i = 0;i < 11;i++) 28 { 29 threadPool.AddTask(&taskObj); 30 } 31 while(1) 32 { 33 sleep(120); 34 } 35 return 0; 36 }