zoukankan      html  css  js  c++  java
  • MIT 2012分布式课程基础源码解析-线程池实现

    主要内容

    1. ScopedLock
    2. 队列实现
    3. 线程池实现

    在正式讲解线程池实现之前,先讲解两个有用的工具类:

    • ScopedLock
    • fifo队列

    ScopedLock:

    ScopedLock是局域锁的实现(我也不知道叫什么,姑且这么说吧),它使用了C++中RAII(Resource acquisition is initialization资源获取即初始化),这种技巧实现的锁可在代码块开始处初始化锁,在代码块结束处释放锁,可省去try catch这样的语句,具体实现如下:

    struct ScopedLock {
        private:
            pthread_mutex_t *m_;
        public:
            ScopedLock(pthread_mutex_t *m): m_(m) {
                VERIFY(pthread_mutex_lock(m_)==0);
            }
            ~ScopedLock() {
                VERIFY(pthread_mutex_unlock(m_)==0);
            }
    };

    其中宏VERIFY定义如下,这时贯穿于整个项目并最常用的一个宏:

    #ifdef NDEBUG
    #define VERIFY(expr) do { if (!(expr)) abort(); } while (0)
    #else
    #define VERIFY(expr) assert(expr)
    #endif

    ScopedLock的使用方法如下:

    void func() {
       ......
       {
            ScopedLock lock(&m_);
           ......  
       }  
      ......
    }

    在代码块{}定义一个lock变量即可,在}处便能自动调用析构函数,从而自动释放锁,这种技巧在《c++必知必会》中也是强烈推荐的一种技巧。

    fifo队列实现

    代码中的fifo队列实现了简单生产者消费者模型,提供了阻塞非阻塞选项,实现代码在fifo.h文件下,我们首先看看类定义:

    template<class T>
    class fifo {
        public:
            fifo(int m=0);  //默认limit为了,即代表队列大小无限制
            ~fifo();
                    //默认实现为阻塞队列
            bool enq(T, bool blocking=true); //入队,默认为阻塞
            void deq(T *);  //出队
            bool size();     //队列大小
    
        private:
            std::list<T> q_;  //队列
            pthread_mutex_t m_; //互斥量保护队列
            pthread_cond_t non_empty_c_; // q went non-empty
            pthread_cond_t has_space_c_; // q is not longer overfull
            unsigned int max_; //maximum capacity of the queue, block enq threads if exceeds this limit
    };
    View Code

    该实现仅是对C++ STL的list进行了简单的封装,在这基础上增加了些条件变量控制,我们主要看enq和deq的实现。

    template<class T> bool
    fifo<T>::enq(T e, bool blocking)
    {
        //使用了局域锁,在函数返回之前便会释放锁
        ScopedLock ml(&m_);
        while (1) {
            //当limit = 0(即代表大小无限制时),插入元素
            //或当队列大小小于max_时,插入元素
            if (!max_ || q_.size() < max_) {
                q_.push_back(e);
                break;
            }
            if (blocking)  //若为阻塞队列且队列中没有空间容纳新元素,则等待消费者取走元素
                VERIFY(pthread_cond_wait(&has_space_c_, &m_) == 0);
            else  //若不是阻塞队列,当队列中没有空间可容纳新元素时,立即返回false
                return false;
        }
        //现在队列中至少有一个元素,通知其它等待在该条件变量上的线程(生产者)
        VERIFY(pthread_cond_signal(&non_empty_c_) == 0);
        return true;
    }
    
    template<class T> void
    fifo<T>::deq(T *e)
    {
        ScopedLock ml(&m_);
    
        while(1) {
            //当队列为空时,等待
            if(q_.empty()){
                VERIFY (pthread_cond_wait(&non_empty_c_, &m_) == 0);
            } else { //否则,取出头部元素,注意函数中传入的参数是个指针
                *e = q_.front();
                q_.pop_front();
                if (max_ && q_.size() < max_) { //通知其它线程队列中现在至少有一个空位
                    VERIFY(pthread_cond_signal(&has_space_c_)==0);
                }
                break;
            }
        }
        return;
    }
    View Code

    这样一个简单的fifo队列便实现了,它也是后面线程池实现的一个重要环节。

    线程池实现

    线程池实现中技巧性要求有点高,其中涉及函数指针、类指针、函数对象以及回调等技巧。首先我们来看它的定义,见thr_pool.h文件:

    class ThrPool {
    
    
        public:
            struct job_t {
                void *(*f)(void *); //function point
                void *a; //function arguments
            };
    
            ThrPool(int sz, bool blocking=true); //默认使用阻塞队列
            ~ThrPool();
            //添加工作,其中第一个参数是一个类指针,第二个参数是一个类函数,其参数是一个类型A,
            //第三个参数是第二个类函数指针的参数类型变量
            template<class C, class A> bool addObjJob(C *o, void (C::*m)(A), A a);
            void waitDone();
            //获得Job
            bool takeJob(job_t *j);
    
        private:
            pthread_attr_t attr_;
            int nthreads_;  //线程数目
            bool blockadd_;
    
    
            fifo<job_t> jobq_;  //job队列
            std::vector<pthread_t> th_;  //线程数组
            //私有类,供addObjJob内部调用
            bool addJob(void *(*f)(void *), void *a);
    };
    
        template <class C, class A> bool 
    ThrPool::addObjJob(C *o, void (C::*m)(A), A a)
    {
        //内部类,隐藏了实现细节
        class objfunc_wrapper {
            public:
                C *o; //类指针,也即第一个变量
                void (C::*m)(A a); //类函数
                A a; 
                static void *func(void *vvv) {
                    //将vvv转换为objfunc_wrapper类
                    objfunc_wrapper *x = (objfunc_wrapper*)vvv;
                    //将转换后的各变量赋值给本类中的各变量
                    C *o = x->o;
                    void (C::*m)(A ) = x->m;
                    A a = x->a;
                    (o->*m)(a);  //执行函数,回调的执行
                    delete x;   //
                    return 0;
                }
        };
    
        objfunc_wrapper *x = new objfunc_wrapper;
        x->o = o;
        x->m = m;
        x->a = a;
        //添加工作回调函数
        return addJob(&objfunc_wrapper::func, (void *)x);
    }
    View Code

    看上面的实现特别是addObjJob,确认令人惊叹(大神忽略),这样工作类即可很容易的添加进去,使用线程池时也会更加方便,仅需实现相应的工作类及工作类的回调函数即可。接下来我们看thr_pool.cc文件中的具体实现:

    //线程执行方法,while循环中获取队列中的工作,因为队列默认是阻塞队列
    //线程在没获取到工作时,将阻塞在相应的条件变量上
    static void *
    do_worker(void *arg)
    {
        ThrPool *tp = (ThrPool *)arg; //将this转换为ThrPool指针
        while (1) {
            ThrPool::job_t j;
            if (!tp->takeJob(&j))
                break; //die
    
            (void)(j.f)(j.a); //执行工作
        }
        pthread_exit(NULL);
    }
    
    //if blocking, then addJob() blocks when queue is full
    //otherwise, addJob() simply returns false when queue is full
    ThrPool::ThrPool(int sz, bool blocking)
    : nthreads_(sz),blockadd_(blocking),jobq_(100*sz) 
    {
        pthread_attr_init(&attr_);
        pthread_attr_setstacksize(&attr_, 128<<10);
    
        for (int i = 0; i < sz; i++) {
            pthread_t t;
            //注意这里函数是do_worker,添加的参数为this,这样在do_worker函数中方便取出更多的信息
            VERIFY(pthread_create(&t, &attr_, do_worker, (void *)this) ==0);
            th_.push_back(t);
        }
    }
    
    //IMPORTANT: this function can be called only when no external thread 
    //will ever use this thread pool again or is currently blocking on it
    ThrPool::~ThrPool()
    {
        for (int i = 0; i < nthreads_; i++) {
            job_t j;
            j.f = (void *(*)(void *))NULL; //poison pill to tell worker threads to exit
            jobq_.enq(j);
        }
    
        for (int i = 0; i < nthreads_; i++) {
            VERIFY(pthread_join(th_[i], NULL)==0);
        }
    
        VERIFY(pthread_attr_destroy(&attr_)==0);
    }
    
    //添加工作的私有类,初始化job_t类,并添加到队列中
    bool 
    ThrPool::addJob(void *(*f)(void *), void *a)
    {
        job_t j;
        j.f = f;
        j.a = a;
    
        return jobq_.enq(j,blockadd_);
    }
    
    //获取队列中的工作回调,注意传入的是指针
    bool 
    ThrPool::takeJob(job_t *j)
    {
        jobq_.deq(j);
        return (j->f!=NULL);
    }
    View Code

    该线程池的实现确认令人咋舌,很巧妙的将回调类转换成了内部的job_t类,也不失为一个很好的c++学习案例。

    使用该线程池很简单,只需定义好相应的事件回调类,然后初始化线程池,再将回调类添加(addObjJob)到线程池中即可

  • 相关阅读:
    rkhunter和chkrootkit
    Chkrootkit安装配置教程 – Linux后门入侵检测
    安装asterisk以及asterisk-gui
    职场最让人鄙视哪种招聘面试老板
    谷歌为何大举收购机器人公司?
    evercookie
    美科学家发现量子纠缠幽灵与宇宙虫洞有关
    Storm-YARN
    Twitter开源Summingbird:近原生编码下整合批处理与流处理
    基于keepalived的redis通信链接数测试
  • 原文地址:https://www.cnblogs.com/fwensen/p/5774876.html
Copyright © 2011-2022 走看看