zoukankan      html  css  js  c++  java
  • 线程池的设计实现

    线程池: 就是new一堆线程,当有任务到来时,抓一个线程去执行,执行完之后再丢回线程池。

    省去了新建和注销线程的开销。

    一、线程池工作分为以下几步:

    (1)创建线程固定数目的线程(如:20个),并让线程挂起等待任务
    (2)给某个线程设置任务
    (3)激活该线程,让其执行任务
    (4)线程执行任务完毕后,回收该线程

    二、主要是依靠三对PV操作

    线程池线程
    {

    //取用线程
    V(m_Cond_Run)
    P(m_Cond_IsTaskRun)

    ...
    //回收线程
    P(m_Cond_IsRunning)⑤
    }

    任务线程
    {
    //取用线程
    P(m_Cond_Run);③
    V(m_Cond_IsTaskRun)

    ...
    //回收线程
    V(m_Cond_IsRunning)
    }

     

     

     

     

     

     

     

     如上面结构所示:

    ①状态时,是线程池线程创建任务线程成功,任务线程在③处挂起;
    ②状态时,线程池线程已经运行了一个用于激活任务的V、P操作;任务线程也会运行到④,执行任务;
        这里面,m_Cond_Run的目的是线程池告诉任务线程,需要启动;m_Cond_IsTaskRun是任务线程回复线程池线程说,任务正常启动;
    ⑤所在的地方是线程池发送了结束任务线程的指令,用P操作来等待任务线程结束;
         m_Cond_IsRunning目的是任务线程,在将任务暂停并回收资源成功后,需要告诉线程池线程可以回收我了。

    三、虽然过程不复杂,但是实现过程中的逻辑却不简单,以darwin的源码为例:

    类结构如下:(伪代码)

    class ThreadTask //是一个执行任务的线程类
    {
    friend class ThreadPool;
    public:
        Int start();       //创建线程
    static void _Entry(void *inThread);   //该线程的回调函数
    virtual void Entry();
    Bool Active();     //激活该线程
    
    private:
    Task *m_task;
    
    OSCond     m_Cond_Run;    //用于检测线程是否运行的条件变量
    OSMutex     m_Mutex_Run;   //相对应的互斥量
    
    OSCond     m_Cond_IsTaskRun;  //用于检测是否运行任务的条件变量
    OSMutex     m_Mutex_IsTaskRun; //相对应的互斥量
    
    };
    
    class Task
    {
    friend class ThreadTask;
    public:
        Bool RunTask(){ ThreadPool::RegistTask(this);}
    virtual int Run()=0; //最终执行任务的函数
    };
    
    class ThreadPool
    {
    friend class ThreadTask;
    public:
        ThreadPool(int n);   //初始化
    static  void RegistTask(Task* ptask); //注册任务
    private:
        List<ThreadTask*> m_useList;
        List<ThreadTask*> m_unUseList;
    };

    类的实现如下:(伪代码)

    class MyTask :public Task
    {
        int Run(){...};   //代码中,只要实现Run()函数,就可以了。
    };
    int main()
    {
        MyTask *mytask=new MyTask();
        mytask->RunTask();   //调用RunTask()就可以是任务运行,但是一般通常把这个函数封装起来而不这样用
        return 0;
    }
    
    //取一个线程执行任务
    void ThreadPool::RegistTask(Task* ptask)
    {
        if(m_unUseList.empty())
        {
            //如果没有空闲的线程,那么就创建一个线程
            ThreadTask *pthread = new ThreadTask();
            pthread->start();
            m_useList.Push(pthread);
        }
        ThreadTask *pthread = m_unUseList.Pop();
        p->m_task=ptask;//给线程添加任务
        pthread->Active();//激活线程
    }
    //初始化n个线程,并激活
    void ThreadPool::ThreadPool(int n)
    {
        while(n--)
        {
            ThreadTask *pthread = new ThreadTask();
            pthread->start();
            m_useList.Push(pthread);
        }
    }
    //创建线程
    Int ThreadTask::start()
    {
        //可以看出线程的回调函数是_Entry实际上是一个静态函数,目的是为了调用各自进程的Entry()函数
        return pthread_create((pthread_t*)&m_ThreadID, &m_ThreadAttr, _Entry, (void*)this);
    }
    void ThreadTask::_Entry(void *inThread)
    {
        //调用当前进程对应的Entry()函数
        inThread->Entry();
    }
    void ThreadTask::Entry()
    {
        while (1)
        {
            //这一段相当于P(m_Cond_Run)
            m_Mutex_Run.Lock();
            while(!m_busy)
            {
                m_Cond_Run.Wait(&m_Mutex_Run, 500);
            }
            m_Mutex_Run.Unlock();
    
    
            //这一段相当于V(m_Cond_IsTaskRun)
            m_Mutex_IsTaskRun.Lock();
            m_IsTaskRun = TRUE;
            m_Cond_IsTaskRun.Signal();
            m_Mutex_IsTaskRun.Unlock();
    
            //在这里执行Run()之后再把线程放回线程池的未用队列
            if(m_Task != NULL)
            {
                m_Task->Run();
                delete m_task;
                //在这一部分还有一段用于确认是否运行完毕的PV操作,暂且省略
                
                //这个函数主要是把当前线程从已用队列挪到未用队列
                ThreadPool::ReclaimThread(this);
            }
            //修改状态
            m_Mutex_IsTaskRun.Lock();
            m_IsTaskRun = FALSE;
            m_Mutex_IsTaskRun.Unlock();
        }
    
        return NULL;
    }
    Bool ThreadTask::Active()
    {
        m_Mutex_Run.Lock();
        if(!m_busy)
        {
            //这一段相当于V(m_Cond_Run)
            m_busy = TRUE;
            m_Cond_Run.Signal();
            m_Mutex_Run.Unlock();
    
            //这一段相当于P(m_Cond_IsTaskRun)
            m_Mutex_IsTaskRun.Lock();
            while(!m_IsTaskRun)
            {
                m_Cond_IsTaskRun.Wait(&m_Mutex_IsTaskRun,100);
            }
            m_Mutex_IsTaskRun.Unlock();
            return TRUE;
        }
        m_Mutex_Run.Unlock();
        return TRUE;
    }

     

  • 相关阅读:
    Docker实战--部署简单nodejs应用
    VMWare下ubuntu无法全屏的问题解决
    CentOS 7安装Docker
    修改maven本地仓库的默认地址
    spring cloud 集成 swagger2 构建Restful APIS 说明文档
    使用Redis的INCR、Hsetnx、Hincrby的命令生成序列号
    NetMQ(四): 推拉模式 Push-Pull
    NetMQ(三): 发布订阅模式 Publisher-Subscriber
    NetMQ(二): 请求响应模式 Request-Reply
    NetMQ(一):zeromq简介
  • 原文地址:https://www.cnblogs.com/bugutian/p/5211004.html
Copyright © 2011-2022 走看看