zoukankan      html  css  js  c++  java
  • 面向对象的线程池Threadpool的封装

        线程池是一种多线程处理形式,预先创建好一定数量的线程,将其保存于一个容器中(如vector), 处理过程中将任务添加到队列,然后从容器中取出线程后自动启动这些任务,具体实现如下。

      以下是UML图,展示了类与类之间的大致关系,其中NonCopyable.h未给出。关于类之间的关系的表示,请参见

    博客:http://www.cnblogs.com/liuling/archive/2013/05/03/classrelation.html



    以下对各个类进行解释,给出代码并在注释中会说明每个函数的作用。

    文件清单:

        下面的代码是头文件和实现分开,每一个头文件对应一个实现文件(.cpp)。Noncopyable.h 与test.cpp除外。所以代码经过调试,在Linux下可执行。

        分享一个教训,我调了好久才发现这个低级错误,在编译时候记得是:g++ *.cpp -lpthread.h

    1、Noncopyable类:

        Nocopyable,顾名思义-不可复制。继承自Noncopyable的派生类,不可被复制,我们的策略是将其复制构造函数以及赋值构造函数放到私有区域中。见代码如下:

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Noncopyable.h
    	> Author: HOUJUN
    	> Mail:june506@163.com 
    	> Created Time: Tue 25 Aug 2015 11:06:51 PM HKT
     ************************************************************************/
    
    #ifndef _NONCOPYABLE_H
    #define _NONCOPYABLE_H
    
    class Noncopyable
    {
    	protected:
    		Noncopyable(){}
    		~Noncopyable(){}
    	private:
    		Noncopyable(const Noncopyable & rhs);
    		Noncopyable & operator=(const Noncopyable &rhs);
    };
    #endif</span></span></span>


    2、Threadpool类:

        一个线程池维护有一个任务缓冲器Buffer(内部有队列queue实现)和存储线程地址的vector<Thread*>。对外提供有getTask()与addTask()方法,分别用于向任务队列中取任务和向任务队列中添加任务。

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Threadpool.h
    	> Created Time: Tue 25 Aug 2015 11:39:48 PM HKT
     ************************************************************************/
    
    /*************************************************************/
    //	1、所谓“线程池”,就是预先把线程创建好,省去响应每次响应客户端时
    //创建线程时间上的开销。线程的创建在start()函数中执行。stop()函数
    //负责线程的回收。
    //
    //	2、线程池中有一个保存线程地址的数组,以及一个保存任务的队列。因此
    //私有成员中用到两个整型来分别表示任务队列的大小,以及创建线程的数目
    //
    //	3、线程池提供一个addTask()方法,向任务缓存中添加任务,同时
    //提供一个getTask()方法,从任务缓存中取任务。
    //
    //	4、threadFunc()函数是线程中将要执行的函数。
    //
    //	
    /*************************************************************/
    
    #ifndef _THREADPOOL_H
    #define _THREADPOOL_H
    
    #include "Buffer.h"
    #include <vector>
    class Task;			//前向声明,将在实现文件中给出头文件
    class Thread;
    
    class Threadpool
    {
    	public:
    		Threadpool(int bufsize,int threadNum);
    		~Threadpool();
    		void start();
    		void stop();
    		void addTask(Task *task);
    		Task* getTask();
    
    		void threadFunc();
    
    	private:
    		int size_;
    		Buffer buffer_;
    		int threadNum_;
    		std::vector<Thread *> vecThreads_;
    		bool isExit_;
    };
    #endif</span></span></span>
    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Threadpool.cpp
    	> Created Time: Tue 25 Aug 2015 11:59:03 PM HKT
     ************************************************************************/
    
    #include "Threadpool.h"
    #include "Thread.h"
    #include "MyPoolThread.h"
    #include "Task.h"
    
    Threadpool::Threadpool(int bufsize,int threadNum)
    	:size_(bufsize),
    	buffer_(size_),			//任务缓存的初始化
    	threadNum_(threadNum),
    	vecThreads_(threadNum_),
    	isExit_(false)
    {}
    
    Threadpool::~Threadpool()
    {
    	stop();
    }
    
    
    void Threadpool::start()
    {
    	for(int idx = 0;idx!=threadNum_;idx++)
    	{
    		Thread *pthread = new MyPoolThread(*this);
    		vecThreads_.push_back(pthread);
    		pthread->start();
    	}
    
    }
    
    
    void Threadpool::stop()
    {
    	if(isExit_)
    	{
    		isExit_=true;
    		std::vector<Thread*>::iterator iter;
    		for(iter = vecThreads_.begin();iter!=vecThreads_.end();iter++)
    		{
    			(*iter)->join();
    			delete *iter;		//释放iter所指向的空间
    		}
    		vecThreads_.clear();
    	}
    }
    
    void Threadpool::addTask(Task* task)
    {
    	buffer_.push(task);
    }
    
    Task* Threadpool::getTask()
    {
    	return buffer_.pop();
    }
    
    void Threadpool::threadFunc()
    {
    	while(!isExit_)
    	{
    		Task *task = getTask();
    		if(task != NULL)
    			task->process();
    	}
    }
    </span></span></span>


    3、Buffer类:

        Buffer是一个任务缓冲区,即用来存取任务。由于Buffer是共享资源,因此要用互斥锁与条件变量来进行同步。

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Buffer.h
    	> Created Time: Tue 25 Aug 2015 10:09:19 PM HKT
     ************************************************************************/
    
    #ifndef __BUFFER_H
    #define __BUFFER_H
    
    #include "MutexLock.h"
    #include "Condition.h"
    #include <queue>
    
    class Task; //前向声明
    class Buffer
    {
    	public:
    		Buffer(int size);
    		void push(Task* task);
    		Task* pop();
    		bool empty();
    		bool full();
    
    	private:
    		MutexLock mutex_;
    		Condition notfull_;
    		Condition notempty_;
    		int size_;
    		std::queue<Task*> que_;
    };
    #endif</span></span></span>
    <span style="font-size:18px;"><span style="font-size:18px;"><pre name="code" class="cpp"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Buffer.cpp 
    	> Created Time: Tue 25 Aug 2015 10:14:41 PM HKT
     ************************************************************************/
    
    #include "Buffer.h"
    #include "Task.h"
    
    /************************************************/
    //构造函数
    //由于Buffer 与MutexLock,Condition是组合关系,因此
    //Buffer负责他们的初始化工作
    //mutex_()为对象初始化
    /************************************************/
    Buffer::Buffer(int size)
    	:mutex_(),
    	notfull_(mutex_),
    	notempty_(mutex_),
    	size_(size)
    {}
    
    
    //任务缓存的判空
    bool Buffer::empty()
    {
    	return (que_.size()==0);
    }
    
    //任务缓存的判满
    bool Buffer::full()
    {
    	return (que_.size()==size_);
    }
    
    
    /********************************************************/
    //1、先加锁
    //2、如果满,睡眠等待不满
    //3、当满足不满(即有空位)的条件,唤醒阻塞等待添加任务的线程
    //4、向队列添加任务
    //5、通知等到非空(即有任务)的条件,唤醒等待取任务的线程
    /*********************************************************/
    void Buffer::push(Task* task)
    {
    	MutexLockGuard guard(mutex_);
    	while(full())
    		notfull_.wait();
    	que_.push(task);
    	notempty_.notify();
    }
    
    
    /********************************************************/
    //1、先加锁
    //2、如果空,睡眠等待非空
    //3、通知等到非空(即有任务可执行)的条件,唤醒等待取任务的线程
    //4、从队列取任务,返回任务的地址
    //5、当满足不满(即有空位)的条件,唤醒阻塞等待添加任务的线程
    /*********************************************************/
    Task* Buffer::pop()
    {
    	MutexLockGuard guard(mutex_);
    	while(empty())
    		notempty_.wait();
    	Task* task = que_.front();
    	que_.pop();
    	notfull_.notify();
    	return task;
    }</span></span></span>

    4、MutexLock类:

        当对临界区数据进行操作的时候,要进行同步,否则后果不可预见。要用到互斥锁mutex

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;"><span style="font-family:SimSun;font-size:18px;">/*************************************************************************
    	> File Name: MutexLock.h 
    	> Created Time: Tue 25 Aug 2015 10:49:24 PM HKT
     ************************************************************************/
    
    #ifndef __MUTEXLOCK_H
    #define __MUTEXLOCK_H
    
    #include "Noncopyable.h"	//条件变量为系统资源类,禁止复制
    #include <pthread.h>		//互斥量定义在线程头文件中
    
    /******************************************/
    //互斥锁无非就是进行加锁与解锁,因此除了提供
    //一个构造函数与一个析构函数外,还有一个加锁
    //和一个解锁函数。同时,还有一个用于获取当前
    //互斥量的一个指针。
    /******************************************/
    class MutexLock : private Noncopyable
    {
    	public:
    		MutexLock();
    		~MutexLock();
    		void lock();
    		void unlock();
    		pthread_mutex_t * getMutexPtr();
    	private:
    		pthread_mutex_t mutex_;
    };
    
    
    /********************************************/
    //基于MutexLock类的一个操作类,私有成员是mutex_
    //在创建MutexGuard对象的时候进行加锁,因此我们
    //可以不去直接操作MutexLock的lock()函数。
    //在析构对象的时候解锁_
    /********************************************/
    class MutexLockGuard
    {
    	public:
    		MutexLockGuard(MutexLock &mutex)
    			:mutex_(mutex)
    		{
    			mutex_.lock();
    		}
    
    		~MutexLockGuard()
    		{
    			mutex_.unlock();
    		}
    	private:
    		MutexLock & mutex_;
    };
    #endif</span></span></span></span>
    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: MutexLock.cpp
    	> Created Time: Tue 25 Aug 2015 11:11:36 PM HKT
     ************************************************************************/
    
    #include "MutexLock.h"
    MutexLock::MutexLock()
    {
    	pthread_mutex_init(&mutex_,NULL);
    }
    
    void MutexLock::lock()
    {
    	pthread_mutex_lock(&mutex_);
    }
    
    void MutexLock::unlock()
    {
    	pthread_mutex_unlock(&mutex_);
    }
    
    pthread_mutex_t * MutexLock::getMutexPtr()
    {
    	return &mutex_;
    }
    
    MutexLock::~MutexLock()
    {
    	pthread_mutex_destroy(&mutex_);
    }</span></span></span>


    5、Condition类:

        通常情况下,条件变量与互斥量总会在同步中同时出现,两者相互配合,完成同步工作。

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;"><span style="font-family:SimSun;font-size:18px;">/*************************************************************************
    	> File Name: Condition.h 
    	> Created Time: Tue 25 Aug 2015 11:18:30 PM HKT
     ************************************************************************/
    
    #ifndef _CONDITION_H
    #define _CONDITION_H
    
    #include "Noncopyable.h"
    #include <pthread.h>
    
    class MutexLock; //前向声明
    class Condition : private Noncopyable
    {
    	public:
    		Condition(MutexLock & mutex);
    		void wait();
    		void notify();
    		void notifyall();
    		~Condition();
    	private:
    		pthread_cond_t cond_;
    		MutexLock & mutex_;
    };
    #endif</span></span>
    
    </span></span>
    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;"><span style="font-family:SimSun;font-size:18px;">/*************************************************************************
    	> File Name: Condition.cpp 
    	> Created Time: Tue 25 Aug 2015 11:24:58 PM HKT
     ************************************************************************/
    
    #include "MutexLock.h"
    #include "Condition.h"
    
    /***************************************/
    //由于Condition中的wait()函数要使用到
    //MutexLock对象,因此在此要传入一个MutexLock
    //对象,对成员函数进行初始化
    //
    //以下函数体中的的系统调用,要求我们熟悉
    //pthread_cond_t的API使用方法
    //
    /***************************************/
    Condition::Condition(MutexLock & mutex) //Condition依赖MutexLock
    	:mutex_(mutex)
    {
    	pthread_cond_init(&cond_,NULL);
    }
    
    Condition::~Condition()
    {
    	pthread_cond_destroy(&cond_);
    }
    
    void Condition::wait()
    {
    	pthread_cond_wait(&cond_,mutex_.getMutexPtr());
    }
    
    void Condition::notify()
    {
    	pthread_cond_signal(&cond_);
    }
    
    void Condition::notifyall()
    {
    	pthread_cond_broadcast(&cond_);
    }</span></span></span></span>


    6、Thread类

        Thread类是一个虚类,其中有一个虚函数run()。

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Thread.h
    	> Created Time: Wed 26 Aug 2015 11:22:08 AM HKT
     ************************************************************************/
    
    #ifndef _THREAD_H
    #define _THREAD_H
    #include "Noncopyable.h"
    #include <pthread.h>
    class Thread : private Noncopyable
    {
    	public:
    		Thread()
    			:pthId_(0),
    			isRunning_(false)
    		{}
    		void start();
    		void join();
    		virtual void run()=0; 
    		~Thread();
    		static void *runInThread(void *arg);
    	private:
    		pthread_t pthId_;
    		bool isRunning_;
    };
    #endif</span></span></span>
    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;"><span style="font-family:SimSun;font-size:18px;">/*************************************************************************
    	> File Name: Thread.cpp 
    	> Created Time: Wed 26 Aug 2015 11:26:43 AM HKT
     ************************************************************************/
    
    #include "Thread.h"
    
    /************************************/
    //调用start()创建线程,调用phtread_create()
    //设置运行标志位true
    /************************************/
    void Thread::start()
    {
    	pthread_create(&pthId_,NULL,runInThread,this);
    	isRunning_=true;
    }
    
    /************************************/
    //调用join()回收线程
    //设置运行标志位为false
    //
    //注意:创建者调用join()函数回收子线程,
    //当子线程没有运行结束,则创建者会阻塞
    //等待子线程结束。
    /************************************/
    void Thread::join()
    {
    	pthread_join(pthId_,NULL);
    	isRunning_=false;
    }
    
    
    /*************************************/
    //析构函数
    //
    //但是我们并不希望主线程吊死在一棵树上,
    //它还有其它任务,所以:
    //	pthread_detach(pthId),将线程状态设置
    //为detached状态,当线程运行结束时候自动
    //释放资源。(非阻塞,可立即返回)
    /*************************************/
    Thread::~Thread()
    {
    	if(isRunning_)
    	{
    		pthread_detach(pthId_);
    		isRunning_=false;
    	}
    }
    
    
    /********************************************/
    //arg是在创建线程时候传入的this,再此将void类型
    //的arg强转成Thread类型。
    /********************************************/
    void *Thread::runInThread(void *arg)
    {
    	Thread *pThread = static_cast<Thread*>(arg);
    	pThread->run();
    	return NULL;
    }
    </span></span></span></span>


    7、MyPoolThread类

        MyPoolThread类是Thread类的实现类。

    <span style="font-size:18px;">/*************************************************************************
    	> File Name: MyPoolThread.h
    	> Created Time: Wed 26 Aug 2015 03:45:46 PM HKT
     ************************************************************************/
    
    #ifndef _MYPOOLTHREAD_H
    #define _MYPOOLTHREAD_H
    
    /***************************************/
    //1、MyPoolThread继承自Thread,拥有Thread
    //的所有非私有成员及函数。
    //
    //2、之所以要传入线程池的引用,是因为
    //Thread中的run()方法要执行线程池中的
    //threadFunc()方法
    /***************************************/
    #include "Thread.h"
    class Threadpool;
    class MyPoolThread : public Thread
    {
    	public:
    		MyPoolThread(Threadpool &threadpool);
    		void run();
    	private:
    		Threadpool &threadpool_;
    };
    #endif</span>
    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: MyPoolThread.cpp
    	> Created Time: Wed 26 Aug 2015 03:51:05 PM HKT
     ************************************************************************/
    
    #include "MyPoolThread.h"
    #include "Threadpool.h"   //在.h文件中进行前向声明
    MyPoolThread::MyPoolThread(Threadpool & threadpool)
    	:threadpool_(threadpool)
    {}
    
    void MyPoolThread::run()
    {
    	threadpool_.threadFunc();
    }</span></span></span>


    8、Task类

        任务是用户让线程池做的事情,它将作为一个参数传给线程池,让线程池将它添加到任务队列中,等待工作线程将其完成。以下的任务封装了一个方法,该方法是产生随机数

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Task.h
    	> Created Time: Wed 26 Aug 2015 03:32:53 PM HKT
     ************************************************************************/
    
    #ifndef _TASK_H
    #define _TASK_H
    
    
    /***********************************/
    //Task就是一个执行的任务
    /***********************************/
    //Task接口类
    class Task
    {
    	public:
    		virtual void process()=0;
    };
    
    //Task的实现类
    class MyTask :public Task
    {
    	public:
    		void process();
    };
    #endif
    </span></span></span>
    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: Task.cpp
    	> Created Time: Wed 26 Aug 2015 03:35:45 PM HKT
     ************************************************************************/
    
    #include "Task.h"
    #include <unistd.h>
    #include <stdlib.h>
    #include <time.h>
    #include <iostream>
    
    void MyTask::process()
    {
    	srand(time(NULL));
    	int num = rand()%100;
    	std::cout<<"product a number:"<<num<<std::endl;
    	sleep(2);
    }</span></span></span>


    9、Test类:

        至此,Threadpool类的封装已经完成,接下来让我们来使用它。

    <span style="font-size:18px;"><span style="font-size:18px;"><span style="font-size:18px;">/*************************************************************************
    	> File Name: test.cpp
    	> Author: HOUJUN
    	> Mail:june506@163.com 
    	> Created Time: Wed 26 Aug 2015 03:55:24 PM HKT
     ************************************************************************/
    
    #include "Threadpool.h"
    #include "Task.h"
    #include <unistd.h>
    
    int main()
    {
    	Threadpool threadpool(5,4);
    	threadpool.start();
    	Task *ptask = new MyTask;
    	while(1)
    	{
    		threadpool.addTask(ptask);
    		sleep(1);
    	}
    	threadpool.stop();
    
    	return 0;
    }</span></span></span>


    运行结果:



       以上就是我对Threadpool封装的总结,分享的同时有希望各位大神指点,找出错误,共同进步。大笑




    版权声明:本文为博主原创文章,未经博主允许不得转载。

  • 相关阅读:
    零基础入门学习Python(11)--列表:一个打了激素的数组(2)
    零基础入门学习Python(10)--列表:一个打了激素的数组
    零基础入门学习Python(9)--了不起的分支和循环3
    零基础入门学习Python(8)--了不起的分支和循环2
    零基础入门学习Python(7)--了不起的分支和循环1
    标量子查询中有ROWNUM=1怎么改?
    零基础入门学习Python(6)--Python之常用操作符
    一次ORA-01555问题分析,及SQL优化。
    零基础入门学习Python(5)--闲聊之Python的数据类型
    Python内置函数(60)——compile
  • 原文地址:https://www.cnblogs.com/houjun/p/4802193.html
Copyright © 2011-2022 走看看