zoukankan      html  css  js  c++  java
  • 利用ACE 自己实现的线程池

    1: 线程池组件的配置文件:

    [log]
    ;DEBUG = 0
    ;INFO  = 1
    ;WARN  = 2
    ;FAULT = 3
    level=0
    
    ;SCREENOUT = 0
    ;FILEOUT   = 1
    ;BOTH      = 2
    director = 2
    
    ;TEST = 0
    ;RUN  = 1
    mode = 0
    
    ;ONE  = 0x00
    ;FOUR = 0x01
    split = 0
    
    ;AUTODEL   = 0x00
    ;MANUALDEL = 0x01
    manager=0
    
    [threadpool]
    minthread = 10
    maxthread = 25
    maxjobsize = 100000
    keepalive = 1


    2:线程池的代码:

    #pragma once
    
    #include "ace/Task.h"
    #include "ace/Synch.h"
    
    class CManager;
    
    class CWorker : public ACE_Task<ACE_MT_SYNCH>
    {
    public:	
    	CWorker(void);
    	CWorker(CManager* pmanager);
    	~CWorker(void);
    public:
    	virtual int open();
    	virtual int svc();
    	virtual int close();	
    	void output();	
    	void setidletimepoint();
    	bool gettimervalid();
    	int addtimer();
    	int removetimer();
    	void increaseinvokenumber();
    	int getinvokenumber();
    	int handle_timeout(const ACE_Time_Value &tv, const void *arg);
    private:
    	ACE_thread_t  threads[0x01];
    	CManager* m_pmanager;
    public:
    	int m_idletimepoint;
    	int getidletimelength();
    	int getthreadid();
    	int m_timerid;
    	int m_invokenumber;
    	static int m_count;
    	bool timervalid;
    };
    
    
    
    
    #include "Manager.h"
    #include "Worker.h"
    #include "Job.h"
    #include "ace/Reactor.h"
    #include "Logger.h"
    #include "ace/OS_NS_time.h"
    #include "ace/OS_NS_unistd.h"
    
    int CWorker::m_count = 0;  
    
    CWorker::CWorker(void)
    {
    	m_count++;
    }
    
    CWorker::CWorker(CManager* pmanager)
    {	
    	CLogger::createinstance()->logdebugmsg("(+%d)worker address=%08x
    ", m_count, this);
    	m_count++;
    	m_pmanager = pmanager;
    	m_timerid = -1;
    	m_invokenumber = 0;	
    	timervalid = false;
    
    }
    
    CWorker::~CWorker(void)
    {
    	m_count--;
    	CLogger::createinstance()->logdebugmsg("(-%d)worker address=%08x
    ", m_count, this);
    }
    
    int CWorker::open()
    {
    	 return activate(THR_NEW_LWP|THR_CANCEL_ENABLE|THR_JOINABLE, 1, 0, ACE_DEFAULT_THREAD_PRIORITY, -1, this, 0, 0, 0, threads);
    }
    
    int CWorker::svc()
    {
    	if (ACE_Thread::self() == threads[0])
    	{
    		ACE_Thread_Manager *mgr = this->thr_mgr();
    		while(true)
    		{
    			ACE_Message_Block *mb;
    			int result = this->getq(mb);
    			if (result == -1)
    			{
    				break;
    			}
    			else
    			{
    				CJob *pjob = (CJob*)mb->base();
    				pjob->run();
    				mb->release();
    				m_pmanager->put(this);
    			}
    		}
    	}
    	return 0;
    }
    
    int CWorker::close()
    {
    	return 0;
    }
    
    void CWorker::setidletimepoint()
    {
    	m_idletimepoint = time(NULL);
    }
    
    int CWorker::getidletimelength()
    {
    	return (int)(time(NULL) - m_idletimepoint);
    }
    
    int CWorker::getthreadid()
    {
    	return (int)threads[0];
    }
    
    void CWorker::output()
    {
    	CLogger::createinstance()->logfaultmsg("id=%05d idletime=%d invoke=%d
    ", getthreadid(), getidletimelength(), getinvokenumber());
    }
    
    int CWorker::addtimer()
    {
    	ACE_Time_Value tv(m_pmanager->getkeepalivetime(), 0);
    	m_timerid = ACE_Reactor::instance()->schedule_timer(this, NULL, tv);
    	return m_timerid;
    }
    
    int CWorker::removetimer()
    {
    	return ACE_Reactor::instance()->cancel_timer(m_timerid);
    }
    
    
    bool CWorker::gettimervalid()
    {
    	return timervalid;
    }
    
    int CWorker::handle_timeout(const ACE_Time_Value &tv, const void *arg)
    {
    	if (m_pmanager->getthreadnumber() > m_pmanager->getcorepoolsize())
    	{	
    		timervalid = true;
    		m_pmanager->reducethreadnumber();	
    		this->msg_queue()->deactivate();
    		wait();
    		m_pmanager->recyclebin(this);
    		delete this;
    	}
    	return 0;
    }
    
    void  CWorker::increaseinvokenumber()
    {
    	m_invokenumber++;
    }
    
    int  CWorker::getinvokenumber()
    {
    	return m_invokenumber;
    }
    	
    


     

    #pragma once
    
    #include "list"
    #include "ace/Synch.h"
    
    using namespace std;
    
    class CWorker;
    class CManager;
    
    class CWorkerlist
    {
    	typedef list<CWorker*> IDLEWORKERLIST;
    	typedef list<CWorker*> BUSYWORKERLIST;
    public:
    	CWorkerlist(void);
    	~CWorkerlist(void);
    private:
    	IDLEWORKERLIST m_idleworkerlist;
    	BUSYWORKERLIST m_busyworkerlist;
    private:
    	ACE_Thread_Mutex m_mutex;
    	ACE_Condition<ACE_Thread_Mutex> *m_pcondition;
    public:
    	CWorker* get();
    	void put(CWorker* pworker);
    	int recyclebin(CWorker* pworker);
    	void getsize(int* idle, int* busy);
    	void output();
    };
    
    
    #include "Workerlist.h"
    #include "Worker.h"
    #include "Manager.h"
    #include "logger.h"
    
    
    CWorkerlist::CWorkerlist(void)
    {
    	m_pcondition = new ACE_Condition<ACE_Thread_Mutex>(m_mutex);
    }
    
    
    CWorkerlist::~CWorkerlist(void)
    {
    	delete m_pcondition;
    }
    
    
    CWorker* CWorkerlist::get()
    {
    	m_mutex.acquire();
    	while(m_idleworkerlist.size() == 0)
    	{
    		m_pcondition->signal();
    		m_pcondition->wait();
    	}
    	CWorker* pworker = NULL;
    	pworker = m_idleworkerlist.front();
    	//删除定时器
    	pworker->removetimer();
    	pworker->increaseinvokenumber();
    
    	m_idleworkerlist.pop_front();
    	m_busyworkerlist.push_back(pworker);
    	m_mutex.release();
    	return pworker;
    }
    
    
    int CWorkerlist::recyclebin(CWorker* pworker)
    {
    
    	#define NOT_FOUND          0x00
    	#define IN_IDLE_QUEUE      0x01
    	#define IN_BUSY_QUEUE      0x02
    		
    	int result = NOT_FOUND;
    
    	m_mutex.acquire();
    
    	typedef list<CWorker*>::iterator ITERATOR;
    	ITERATOR LI;
    
    	for(LI = m_idleworkerlist.begin(); LI != m_idleworkerlist.end(); LI++)
    	{
    		if ((*LI) == pworker)
    		{
    			result = IN_IDLE_QUEUE;	
    			m_idleworkerlist.erase(LI);
    			m_mutex.release();
    			return result;
    		}
    	}
    
    
    	for(LI = m_busyworkerlist.begin(); LI != m_busyworkerlist.end(); LI++)
    	{
    		if ((*LI) == pworker)
    		{
    			result = IN_BUSY_QUEUE;	
    			m_idleworkerlist.erase(LI);
    			m_mutex.release();
    			return result;
    		}
    	}
    
    	m_mutex.release();
    	return result;
    }
    
    
    
    void CWorkerlist::put(CWorker* pworker)
    {
    	m_mutex.acquire();
    
    	typedef list<CWorker*>::iterator ITERATOR;
    	ITERATOR LI;
    	
    	for(LI = m_busyworkerlist.begin(); LI != m_busyworkerlist.end(); LI++)
    	{
    		if (*LI == pworker)
    		{
    			(*LI)->removetimer();
    			//空闲定时器已经生效,开始走删除该工作线程的流程了,所以保持在忙的队列中。
    			if(pworker->gettimervalid() != true)
    			{
    				m_busyworkerlist.erase(LI);
    			}
    			break;
    		}
    	}
    	
    	if(pworker->gettimervalid() != true)
    	{
    		//启动定时器
    		pworker->setidletimepoint();
    		pworker->addtimer();
    
    		m_idleworkerlist.push_back(pworker);
    		m_pcondition->signal();
    	}
    	m_mutex.release();
    }
    
    
    void CWorkerlist::getsize(int *idle, int *busy)
    {
    	m_mutex.acquire();
    	*idle = m_idleworkerlist.size();
    	*busy = m_busyworkerlist.size();
    	m_mutex.release();
    }
    
    
    void CWorkerlist::output()
    {
    	m_mutex.acquire();
    
    	typedef list<CWorker*>::iterator ITERATOR;
    	ITERATOR LI;
    
    
    	CLogger::createinstance()->logfaultmsg("idle thread information as follows
    ");
    	int idle = 0;
    	for(LI = m_idleworkerlist.begin(); LI != m_idleworkerlist.end(); LI++)
    	{
    		idle++;
    		(*LI)->output();
    	}
    
    	
    	CLogger::createinstance()->logfaultmsg("busy thread information as follows
    ");
    	int busy = 0;
    	for(LI = m_busyworkerlist.begin(); LI != m_busyworkerlist.end(); LI++)
    	{
    		busy++;
    		(*LI)->output();
    	}
    	
    	CLogger::createinstance()->logfaultmsg("idle=%d busy=%d
    ", idle, busy);
    
    	m_mutex.release();
    }
    
    


     

    #pragma once
    
    #include "ace/Task.h"
    #include "ace/Synch.h"
    #include "Job.h"
    #include "Workerlist.h"
    
    class CWorker;
    
    class CManager : public ACE_Task<ACE_MT_SYNCH>
    {
    public:
    	CManager(void);
    	CManager(int corepoolsize, int maxpoolsize, int maxjobsize, int keepalivetime);
    	~CManager(void);
    public:
    	virtual int open();
    	virtual int svc();
    	virtual int close();
    private:
    	ACE_thread_t  threads[0x02];
    	CWorkerlist m_workerqueue;
    	ACE_Thread_Mutex m_mutex;
    	int m_corepoolsize;
    	int m_maxpoolsize;
    	int m_maxjobsize;
    	int m_keepalivetime;
    	int m_threadnumber;
    	bool m_done;
    public:
    	int getcorepoolsize();
    	int getmaxpoolsize();
    	int getmaxjobsize();
    	void setcorepoolsize(int corepoolsize);
    	void setmaxpoolsize(int maxpoolsize);
    	void setkeepalivetime(int keepalivetime);
    	void setmaxjobsize(int maxjobsize);
    	void setthreadcount(int threadcount);
    	int  getjobqueuesize();
    	void outputjobqueue();
    	void outputworkerqueue();
    	void addthreadnumber();
    	void reducethreadnumber();
    	int getthreadnumber();
    public:
    	int  submitnormaljob(const CJob* pjob, int size);	
    	int  put(CWorker* pworker);
    	int  recyclebin(CWorker* pworker);
    	int  getkeepalivetime();
    	int  stop();
    };
    
    
    #include "Manager.h"
    #include "Worker.h"
    #include "Logger.h"
    
    
    CManager::CManager(void)
    {
    	setcorepoolsize(5);
    	setmaxpoolsize(25);
    	setmaxjobsize(50000);
    	setkeepalivetime(10); 
    	m_done = true;
    	m_threadnumber = 0;
    }
    
    CManager::CManager(int corepoolsize = 5, int maxpoolsize = 25, int maxjobsize = 50000, int keepalivetime = 10)
    {
    	setcorepoolsize(corepoolsize);
    	setmaxpoolsize(maxpoolsize);
    	setmaxjobsize(maxjobsize);
    	setkeepalivetime(keepalivetime);
    	m_done = true;
    	m_threadnumber = 0;
    }
    
    
    CManager::~CManager(void)
    {
    }
    
    int CManager::open()
    {
    	 return activate(THR_NEW_LWP|THR_CANCEL_ENABLE|THR_JOINABLE, 2, 0, ACE_DEFAULT_THREAD_PRIORITY, -1, this, 0, 0, 0, threads);
    }
    
    int CManager::svc()
    {
    	if (ACE_Thread::self() == threads[0])
    	{
    		CLogger::createinstance()->loginfomsg("starts the thread of processing job and threadid=%d
    ", threads[0]);
    		while(true)
    		{	
    			ACE_Message_Block *mb_job;	
    			getq(mb_job);
    			if (mb_job->msg_type() == ACE_Message_Block::MB_DATA)
    			{
    				CWorker* pworker = m_workerqueue.get();
    				pworker->putq(mb_job);
    			}
    		}
    	}
    	else if(ACE_Thread::self() == threads[1])
    	{
    		CLogger::createinstance()->loginfomsg("starts the thread of processing command from keyboard and threadid=%d
    ", threads[2]);
    		while(true)
    		{
    			int command = 0;
    			cin>>command;
    			if (command == 0)
    			{
    				printf("0: help
    ");
    				printf("1: thread info
    ");
    				printf("2: job infomation
    ");
    				printf("3: stop
    ");
    			}
    			else if(command == 1)
    			{
    				printf("total=%d
    ", getthreadnumber());
    				printf("corepoolsize=%d
    ", getcorepoolsize());
    				printf("maxpoolsize=%d
    ", getmaxpoolsize());
    				printf("keepalivetime=%d
    ", getkeepalivetime());
    				m_workerqueue.output();
    			}
    			else if(command == 2)
    			{
    				printf("maximum=%d
    ", getmaxjobsize());
    				printf("currentsize=%d
    ", getjobqueuesize());
    			}
    			else if(command == 3)
    			{
    				stop();
    				ACE_OS::sleep(10);
    				CLogger::createinstance()->loginfomsg("the thread of processing command from keyboard and threadid=%d
    ", threads[2]);
    				break;
    			}
    		}
    	}
    	return 0;
    }
    
    int CManager::close()
    {
    	return 0;
    }
    
    int CManager::getcorepoolsize()
    {
    	return m_corepoolsize;
    }
    
    int CManager::getmaxpoolsize()
    {
    	return m_maxpoolsize;
    }
    
    int CManager::getkeepalivetime()
    {
    	return m_keepalivetime;
    }
    
    int CManager::getmaxjobsize()
    { 
    	return m_maxjobsize;
    }
    
    void CManager::setcorepoolsize(int corepoolsize)
    {
    	m_corepoolsize = corepoolsize;
    }
    
    void CManager::setmaxpoolsize(int maxpoolsize)
    {
    	m_maxpoolsize = maxpoolsize;
    }
    
    void CManager::setkeepalivetime(int keepalivetime)
    {
    	m_keepalivetime = keepalivetime;
    }
    
    void CManager::setmaxjobsize(int maxjobsize)
    {
    	m_maxjobsize = maxjobsize;
    }
    
    
    int CManager::getjobqueuesize()
    {
    	return msg_queue_->message_count();
    }
    
    
    int CManager::submitnormaljob(const CJob* pjob, int size)
    {	
    	int result = -1;
    	if (!m_done) 
    	{
    		CLogger::createinstance()->logfaultmsg("discard the job because of the threadpool has stopped to work
    ");
    		return result;
    	}
    	if (getjobqueuesize() >= getmaxjobsize())
    	{
    		CLogger::createinstance()->logfaultmsg("discard the job because of the jobqueue is full
    ");
    		return result;
    	}
    
    	ACE_Message_Block *mbjob = new ACE_Message_Block(size, ACE_Message_Block::MB_DATA);
    	mbjob->copy((char*)pjob, size);
    	putq(mbjob);
    
    	int idle = 0;
    	int busy = 0;
    	m_workerqueue.getsize(&idle, &busy);
    
    	if (idle == 0 && getthreadnumber() < getmaxpoolsize())
    	{
    		CWorker* pworker = new CWorker(this);
    		addthreadnumber();
    		pworker->open();
    		put(pworker);
    	}
    	result = 0;
    	return result;
    }
    
    int CManager::put(CWorker* pworker)
    {
    	m_workerqueue.put(pworker);
    	return 0;
    }
    
    int  CManager::recyclebin(CWorker* pworker)
    {
    	return m_workerqueue.recyclebin(pworker);
    }
    
    void CManager::outputjobqueue()
    {
    	ACE_Message_Block *mb;
    	msg_queue_->peek_dequeue_head(mb);
    	do
    	{
    		CJob* pjob = (CJob*)mb->base();
    	}
    	while(mb = mb->next());
    }
    
    void CManager::outputworkerqueue()
    {
    	m_workerqueue.output();
    }
    
    int CManager::stop()
    {
    	int result = -1;
    	m_done = false;
    	result = 1;
    	return result;
    }
    
    void CManager::addthreadnumber()
    {
    	m_mutex.acquire();
    	m_threadnumber ++;
    	m_mutex.release();
    }
    
    void CManager::reducethreadnumber()
    {
    	m_mutex.acquire();
    	m_threadnumber --;
    	m_mutex.release();
    }
    
    int CManager::getthreadnumber()
    {
    	return m_threadnumber;
    }


     

  • 相关阅读:
    MinGW离线包下载地址
    词法分析器--DFA(c++实现)
    linux下shell统计文件目录下所有代码行数
    四则运算表达式
    BliBli抢楼全攻略
    python 电影下载链接爬虫
    in, out, ref
    联合查询
    SQL语句大全
    LINQ
  • 原文地址:https://www.cnblogs.com/keanuyaoo/p/3268667.html
Copyright © 2011-2022 走看看