C++实现简单的线程池
线程池编程简介:
在我们的服务端的程序中运用了大量关于池的概念,线程池、连接池、内存池、对象池等等。使用池的概念后可以高效利用服务器端的资源,比如没有大量的线程在系统中进行上下文的切换,一个数据库连接池,也只需要维护一定里的连接,而不是占用很多数据库连接资源。同时它们也避免了一些耗时的操作,比如创建一个线程,申请一个数据库连接,而且可能就只使用那么一次,然后就立刻释放刚申请的资源,效率很低。
在我的上一篇blog中已经实现一个线程基类了,在这里我们只需要实现一个线程池类ThreadPool和该线程池调度的工作线程类WorkThread即可,而且WorkThread是继承自Thread类的。
实现思路:
一个简单的线程池的实现思路一般如下:
- 在ThreadPool中创建多个线程(WorkThreadk对象),每个线程均处于阻塞状态,等待任务的到来
- ThreadPool提供一个提交任务的接口,如post_job(ProcCallback func, void* data); post_job后会立即返回,不会阻塞
- ThreadPool维护一个空闲线程队列,当客户程序调用post_job()后,如果空闲队列中有空闲线程,则取出一个线程句柄,并设置任务再给出新任务通知事件即可,处理等待的线程捕捉到事件信号后便开始执行任务,执行完后将该线程句柄重新push到空闲线程队列中
- 该线程池采用回调函数方式
首先我们实现一个WorkThread类:
1 typedef void (APR_THREAD_FUNC *ProcCallBack)(void*); //回调函数指针
2 //由线程池调度的工作线程
3 class WorkThread : public Thread //Thread类的实现可参考我上一篇的blog: 《C++封装一个简单的线程类》
4 {
5 friend class ThreadPool;
6 public:
7 WorkThread(ThreadPool* pthr_pool)
8 {
9 thr_pool_ = pthr_pool;
10 cb_func_ = NULL;
11 param_ = NULL;
12 }
13 virtual ~WorkThread(){}
14 void set_job(ProcCallBack func, void* param)
15 {
16 cb_func_ = func;
17 param_ = param;
18 notify(); //通知有新的任务
19 }
20 //实现Thread的run方法,并调用用户指定的函数
21 virtual void run()
22 {
23 if (cb_func_)
24 cb_func_(param_);
25
26 //reset callback function pointer
27 cb_func_ = NULL;
28 param_ = NULL;
29
30 //执行完任务,将该线程句柄移到线程池空闲队列
31 thr_pool_->move_to_idle_que(this);
32 }
33
34 private:
35 ThreadPool* thr_pool_; //线程池指针
36 ProcCallBack cb_func_; //回调函数地址
37 void* param_; //回调函数参数
38 };
2 //由线程池调度的工作线程
3 class WorkThread : public Thread //Thread类的实现可参考我上一篇的blog: 《C++封装一个简单的线程类》
4 {
5 friend class ThreadPool;
6 public:
7 WorkThread(ThreadPool* pthr_pool)
8 {
9 thr_pool_ = pthr_pool;
10 cb_func_ = NULL;
11 param_ = NULL;
12 }
13 virtual ~WorkThread(){}
14 void set_job(ProcCallBack func, void* param)
15 {
16 cb_func_ = func;
17 param_ = param;
18 notify(); //通知有新的任务
19 }
20 //实现Thread的run方法,并调用用户指定的函数
21 virtual void run()
22 {
23 if (cb_func_)
24 cb_func_(param_);
25
26 //reset callback function pointer
27 cb_func_ = NULL;
28 param_ = NULL;
29
30 //执行完任务,将该线程句柄移到线程池空闲队列
31 thr_pool_->move_to_idle_que(this);
32 }
33
34 private:
35 ThreadPool* thr_pool_; //线程池指针
36 ProcCallBack cb_func_; //回调函数地址
37 void* param_; //回调函数参数
38 };
该WorkThread中,有一个回调函数指针和参数,当有新任务时,会在run()中被调用,执行完后会将该线程移动到空闲线程队列,等待下一次任务的提交。
ThreadPool类定义如下:
1 class ThreadPool
2 {
3 friend class WorkThread;
4 public:
5 ThreadPool();
6 virtual ~ThreadPool();
7 int start_thread_pool(size_t thread_num = 5); //启动thread_num个线程
8 int stop_thread_pool(); //线束线程池
9 void destroy(); //销毁线程池所申请的资源
10 void post_job(ProcCallBack func, void* data); //提交任务接口,传入回调函数地址和参数
11
12 protected:
13 WorkThread* get_idle_thread(); //从获得空闲队列中取得一个线程句柄
14 void append_idle_thread(WorkThread* pthread); //加入到thread_vec_和idl_que_中
15 void move_to_idle_que(WorkThread* idlethread); //将线程句柄加入到idle_que_中
16
17 private:
18 size_t thr_num_; //线程数目
19 vector<WorkThread*> thr_vec_; //线程句柄集合
20 BlockQueue<WorkThread*> idle_que_; //空闲线程队列
21
22 private:
23 // not implement
24 ThreadPool(const ThreadPool& );
25 ThreadPool& operator=(const ThreadPool& );
26 };
2 {
3 friend class WorkThread;
4 public:
5 ThreadPool();
6 virtual ~ThreadPool();
7 int start_thread_pool(size_t thread_num = 5); //启动thread_num个线程
8 int stop_thread_pool(); //线束线程池
9 void destroy(); //销毁线程池所申请的资源
10 void post_job(ProcCallBack func, void* data); //提交任务接口,传入回调函数地址和参数
11
12 protected:
13 WorkThread* get_idle_thread(); //从获得空闲队列中取得一个线程句柄
14 void append_idle_thread(WorkThread* pthread); //加入到thread_vec_和idl_que_中
15 void move_to_idle_que(WorkThread* idlethread); //将线程句柄加入到idle_que_中
16
17 private:
18 size_t thr_num_; //线程数目
19 vector<WorkThread*> thr_vec_; //线程句柄集合
20 BlockQueue<WorkThread*> idle_que_; //空闲线程队列
21
22 private:
23 // not implement
24 ThreadPool(const ThreadPool& );
25 ThreadPool& operator=(const ThreadPool& );
26 };
线程池实现的关键是如何创建多个线程,并且当任务来临时可以从线程池中取一个线程(也就是去得到其中一个线程的指针),然后提交任务并执行。还有一点就是当任务执行完后,应该将该线程句柄重新加入到空闲线程队列,所以我们将ThreadPool的指针传入给了WorkThread,thr_pool_最后可以调用move_to_idle_que(this)来将该线程句柄移到空闲队列中。
ThreadPool中一些关键代码的实现:
1 int ThreadPool::start_thread_pool(size_t thread_num)
2 {
3 assert(thread_num != 0);
4 thr_num_ = thread_num;
5 int ret = 0;
6 for (size_t i = 0; i < thr_num_; ++i)
7 {
8 WorkThread* pthr = new WorkThread(this);
9 pthr->set_thread_id(i);
10 if ((ret = pthr->start()) != 0)
11 {
12 printf("start_thread_pool: failed when create a work thread: %d ", i);
13 delete pthr;
14 return i;
15 }
16 append_idle_thread(pthr);
17 }
18 return thr_num_;
19 }
20 int ThreadPool::stop_thread_pool()
21 {
22 for (size_t i = 0; i < thr_vec_.size(); ++i)
23 {
24 WorkThread* pthr = thr_vec_[i];
25 pthr->join();
26 delete pthr;
27 }
28 thr_vec_.clear();
29 idle_que_.clear();
30 return 0;
31 }
32 void ThreadPool::destroy()
33 {
34 stop_thread_pool();
35 }
36 void ThreadPool::append_idle_thread(WorkThread* pthread)
37 {
38 thr_vec_.push_back(pthread);
39 idle_que_.push(pthread);
40 }
41 void ThreadPool::move_to_idle_que(WorkThread* idlethread)
42 {
43 idle_que_.push(idlethread);
44 }
45 WorkThread* ThreadPool::get_idle_thread()
46 {
47 WorkThread* pthr = NULL;
48 if (!idle_que_.empty())
49 pthr = idle_que_.take();
50 return pthr;
51 }
52 void ThreadPool::post_job(ProcCallBack func, void* data)
53 {
54 assert(func != NULL);
55 WorkThread* pthr = get_idle_thread();
56 while (pthr == NULL)
57 {
58 apr_sleep(500000);
59 pthr = get_idle_thread();
60 }
61 pthr->set_job(func, data);
62 }
2 {
3 assert(thread_num != 0);
4 thr_num_ = thread_num;
5 int ret = 0;
6 for (size_t i = 0; i < thr_num_; ++i)
7 {
8 WorkThread* pthr = new WorkThread(this);
9 pthr->set_thread_id(i);
10 if ((ret = pthr->start()) != 0)
11 {
12 printf("start_thread_pool: failed when create a work thread: %d ", i);
13 delete pthr;
14 return i;
15 }
16 append_idle_thread(pthr);
17 }
18 return thr_num_;
19 }
20 int ThreadPool::stop_thread_pool()
21 {
22 for (size_t i = 0; i < thr_vec_.size(); ++i)
23 {
24 WorkThread* pthr = thr_vec_[i];
25 pthr->join();
26 delete pthr;
27 }
28 thr_vec_.clear();
29 idle_que_.clear();
30 return 0;
31 }
32 void ThreadPool::destroy()
33 {
34 stop_thread_pool();
35 }
36 void ThreadPool::append_idle_thread(WorkThread* pthread)
37 {
38 thr_vec_.push_back(pthread);
39 idle_que_.push(pthread);
40 }
41 void ThreadPool::move_to_idle_que(WorkThread* idlethread)
42 {
43 idle_que_.push(idlethread);
44 }
45 WorkThread* ThreadPool::get_idle_thread()
46 {
47 WorkThread* pthr = NULL;
48 if (!idle_que_.empty())
49 pthr = idle_que_.take();
50 return pthr;
51 }
52 void ThreadPool::post_job(ProcCallBack func, void* data)
53 {
54 assert(func != NULL);
55 WorkThread* pthr = get_idle_thread();
56 while (pthr == NULL)
57 {
58 apr_sleep(500000);
59 pthr = get_idle_thread();
60 }
61 pthr->set_job(func, data);
62 }
ThreadPool中的BlockQueue<WorkThread*> 也就是一个线程安全的队列,即对std::deque做了一个包装,在插入和取出元素时加了一个读写锁。
使用示例:
//任务执行函数,必须是ProcCallback类型
void count(void* param)
{
// do some your work, like:
int* pi = static_cast<int*>(param);
int val = *pi + 1;
printf("val=%d
", val);
pelete pi;
}
//程序中使用如下:
ThreadPool* ptp = new ThreadPool();
ptp->start_thread_pool(3);
//启动3 个线程
ptp->post_job(count, new int(1));
//提交任务
ptp->post_job(count, new int(2));
ptp->post_job(count, new int(3));
//程序线束时
ptp->stop_thread_pool();
其实count()函数就是我们的业务实现代码,有任务时,可以提交给线程池去执行。
结尾:
其实实现一个线程池或其它什么池并不难,当然线程安全和效率还是要从多写代码的经验中获取。像这个线程池也就是基于预创多个建线程,保保存好它们的线程句柄,当有新任务时取一个线程执行即可,执行完后一定要归还到空闲线程队列中,当然我们可以在线程池中增加一个任务队列,因为当post_job()时,若当时没有空闲线程,有两种方案,一是等待有空闲线程,二是加入到任务队列,当WorkThread线程执行完一个任务后,从任务队列中取一个任务继续执行即可,不会阻塞在post_job()中。
另外,我们可以封装一些线程安全的队列和map什么的,这样在程序中就不用担心创建一个多线程共享的队列时,还必须创建一个锁,挺麻烦的,比如上面的BlockQueue<Type>直接拿来用就行了。