zoukankan      html  css  js  c++  java
  • 【C/C++开发】C++实现简单的线程池

    C++实现简单的线程池

    线程池编程简介:

        在我们的服务端的程序中运用了大量关于池的概念,线程池、连接池、内存池、对象池等等。使用池的概念后可以高效利用服务器端的资源,比如没有大量的线程在系统中进行上下文的切换,一个数据库连接池,也只需要维护一定里的连接,而不是占用很多数据库连接资源。同时它们也避免了一些耗时的操作,比如创建一个线程,申请一个数据库连接,而且可能就只使用那么一次,然后就立刻释放刚申请的资源,效率很低。

        在我的上一篇blog中已经实现一个线程基类了,在这里我们只需要实现一个线程池类ThreadPool和该线程池调度的工作线程类WorkThread即可,而且WorkThread是继承自Thread类的。

     

    实现思路:

        一个简单的线程池的实现思路一般如下:

    1. ThreadPool中创建多个线程(WorkThreadk对象),每个线程均处于阻塞状态,等待任务的到来
    1. ThreadPool提供一个提交任务的接口,如post_job(ProcCallback func, void* data); post_job后会立即返回,不会阻塞
    2. ThreadPool维护一个空闲线程队列,当客户程序调用post_job()后,如果空闲队列中有空闲线程,则取出一个线程句柄,并设置任务再给出新任务通知事件即可,处理等待的线程捕捉到事件信号后便开始执行任务,执行完后将该线程句柄重新push到空闲线程队列中
    3. 该线程池采用回调函数方式

    首先我们实现一个WorkThread类:

     1 typedef void (APR_THREAD_FUNC *ProcCallBack)(void*);        //回调函数指针
     2 //由线程池调度的工作线程
     3 class WorkThread : public Thread      //Thread类的实现可参考我上一篇的blog: 《C++封装一个简单的线程类》
     4 {
     5     friend class ThreadPool;
     6 public:
     7     WorkThread(ThreadPool* pthr_pool)
     8     {
     9         thr_pool_    = pthr_pool;
    10         cb_func_    = NULL;
    11         param_        = NULL;
    12     }
    13     virtual ~WorkThread(){}
    14     void    set_job(ProcCallBack func, void* param)
    15     {
    16         cb_func_    = func;
    17         param_        = param;
    18         notify();            //通知有新的任务
    19     }
    20     //实现Thread的run方法,并调用用户指定的函数
    21     virtual void run()
    22     {
    23         if (cb_func_)
    24             cb_func_(param_);
    25     
    26         //reset callback function pointer
    27         cb_func_    = NULL;
    28         param_        = NULL;
    29     
    30         //执行完任务,将该线程句柄移到线程池空闲队列
    31         thr_pool_->move_to_idle_que(this);
    32     }
    33     
    34 private:
    35     ThreadPool*    thr_pool_;        //线程池指针
    36     ProcCallBack    cb_func_;        //回调函数地址
    37     void*             param_;           //回调函数参数
    38 };

    WorkThread中,有一个回调函数指针和参数,当有新任务时,会在run()中被调用,执行完后会将该线程移动到空闲线程队列,等待下一次任务的提交。

    ThreadPool类定义如下:

     1 class ThreadPool
     2 {
     3     friend class WorkThread;
     4 public:
     5     ThreadPool();
     6     virtual ~ThreadPool();
     7     int    start_thread_pool(size_t thread_num = 5);        //启动thread_num个线程
     8     int    stop_thread_pool();                                      //线束线程池
     9     void    destroy();                                                //销毁线程池所申请的资源
    10     void    post_job(ProcCallBack func, void* data);        //提交任务接口,传入回调函数地址和参数
    11 
    12 protected:
    13     WorkThread*    get_idle_thread();                              //从获得空闲队列中取得一个线程句柄
    14     void        append_idle_thread(WorkThread* pthread);    //加入到thread_vec_和idl_que_中
    15     void        move_to_idle_que(WorkThread* idlethread);    //将线程句柄加入到idle_que_中
    16 
    17 private:
    18     size_t                thr_num_;                      //线程数目
    19     vector<WorkThread*>        thr_vec_;        //线程句柄集合
    20     BlockQueue<WorkThread*>    idle_que_;     //空闲线程队列
    21 
    22 private:
    23     // not implement
    24     ThreadPool(const ThreadPool& );
    25     ThreadPool&    operator=(const ThreadPool& );
    26 };
          线程池实现的关键是如何创建多个线程,并且当任务来临时可以从线程池中取一个线程(也就是去得到其中一个线程的指针),然后提交任务并执行。还有一点就是当任务执行完后,应该将该线程句柄重新加入到空闲线程队列,所以我们将ThreadPool的指针传入给了WorkThread,thr_pool_最后可以调用move_to_idle_que(this)来将该线程句柄移到空闲队列中。
    ThreadPool中一些关键代码的实现:
     1 int ThreadPool::start_thread_pool(size_t thread_num)
     2 {
     3     assert(thread_num != 0);
     4     thr_num_    = thread_num;
     5     int    ret        = 0;
     6     for (size_t i = 0; i < thr_num_; ++i)
     7     {
     8         WorkThread*    pthr = new WorkThread(this);
     9         pthr->set_thread_id(i);
    10         if ((ret = pthr->start()) != 0)
    11         {
    12             printf("start_thread_pool: failed when create a work thread: %d ", i);
    13             delete pthr;
    14             return i;
    15         }
    16         append_idle_thread(pthr);        
    17     }
    18     return thr_num_;
    19 }
    20 int ThreadPool::stop_thread_pool()
    21 {
    22     for (size_t i = 0; i < thr_vec_.size(); ++i)
    23     {
    24         WorkThread* pthr = thr_vec_[i];
    25         pthr->join();
    26         delete pthr;
    27     }
    28     thr_vec_.clear();
    29     idle_que_.clear();
    30     return 0;
    31 }
    32 void ThreadPool::destroy()
    33 {
    34     stop_thread_pool();
    35 }
    36 void ThreadPool::append_idle_thread(WorkThread* pthread)
    37 {
    38     thr_vec_.push_back(pthread);
    39     idle_que_.push(pthread);
    40 }
    41 void ThreadPool::move_to_idle_que(WorkThread* idlethread)
    42 {
    43     idle_que_.push(idlethread);
    44 }
    45 WorkThread* ThreadPool::get_idle_thread()
    46 {
    47     WorkThread*    pthr = NULL;
    48     if (!idle_que_.empty())
    49         pthr = idle_que_.take();
    50     return pthr;
    51 }
    52 void ThreadPool::post_job(ProcCallBack func, void* data)
    53 {
    54     assert(func != NULL);
    55     WorkThread* pthr = get_idle_thread();
    56     while (pthr == NULL)
    57     {
    58         apr_sleep(500000);
    59         pthr = get_idle_thread();
    60     }
    61     pthr->set_job(func, data);
    62 }
    ThreadPool中的BlockQueue<WorkThread*> 也就是一个线程安全的队列,即对std::deque做了一个包装,在插入和取出元素时加了一个读写锁。


    使用示例:
    //任务执行函数,必须是ProcCallback类型
    void count(void* param)
    {
    // do some your work, like: 
    int* pi = static_cast<int*>(param);
    int val = *pi + 1;
    printf("val=%d ", val);
    pelete pi;
    }
    //程序中使用如下:
    ThreadPool* ptp = new ThreadPool();
    ptp->start_thread_pool(3); //启动3 个线程
    ptp->post_job(count, new int(1)); //提交任务
    ptp->post_job(count, new int(2));
    ptp->post_job(count, new int(3));
    //程序线束时
    ptp->stop_thread_pool();
    其实count()函数就是我们的业务实现代码,有任务时,可以提交给线程池去执行。
    结尾:
        其实实现一个线程池或其它什么池并不难,当然线程安全和效率还是要从多写代码的经验中获取。像这个线程池也就是基于预创多个建线程,保保存好它们的线程句柄,当有新任务时取一个线程执行即可,执行完后一定要归还到空闲线程队列中,当然我们可以在线程池中增加一个任务队列,因为当post_job()时,若当时没有空闲线程,有两种方案,一是等待有空闲线程,二是加入到任务队列,当WorkThread线程执行完一个任务后,从任务队列中取一个任务继续执行即可,不会阻塞在post_job()中。
      另外,我们可以封装一些线程安全的队列和map什么的,这样在程序中就不用担心创建一个多线程共享的队列时,还必须创建一个锁,挺麻烦的,比如上面的BlockQueue<Type>直接拿来用就行了。
  • 相关阅读:
    Python3 爬取验证代理
    Python每天学一点之Threading和queue
    Python每天学一点之argparse
    [安恒月赛]反序列化字符逃逸
    $AFO$
    洛谷$P3647 [APIO2014]$连珠线 换根$dp$
    线性基学习笔记
    $vjudge CSP-S$专题专练题解
    $POJ2942 Knights of the Round Table$ 图论
    $tarjan$简要学习笔记
  • 原文地址:https://www.cnblogs.com/huty/p/8517387.html
Copyright © 2011-2022 走看看