zoukankan      html  css  js  c++  java
  • 可伸缩多线程任务队列

      在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

      出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下几个功能:

        1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

        2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

        3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

        4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

      大体框架主要由3个类构成

        1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

        2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

        3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

      类图如下:

      该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

      Job.h文件:

    class CJob
    {
    public:
        CJob();
        virtual ~CJob();
        
        BOOL m_Completed;         //任务是否完成:TRUE 完成,FALSE 未完成
        static long lastUsedID;   //最后的ID
        
        //================================================================================================
        //函数名:                  setPriority
        //函数描述:                设置任务优先级
        //输入:                    [in] priority 优先级别
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void setPriority(int priority);
    
        //================================================================================================
        //函数名:                  getPriority
        //函数描述:                返回任务优先级
        //输入:                    无
        //输出:                    无
        //返回:                    任务优先级
        //================================================================================================
        int getPriority();
        
        //================================================================================================
        //函数名:                  getID
        //函数描述:                返回任务ID
        //输入:                    无
        //输出:                    无
        //返回:                    任务ID
        //================================================================================================
        long getID();
        
        //================================================================================================
        //函数名:                  setAutoDelete
        //函数描述:                设置完成任务后是否删除任务
        //输入:                    [in] autoDeleteFlag
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void setAutoDelete(BOOL autoDeleteFlag = TRUE);
    
        //================================================================================================
        //函数名:                  AutoDelete
        //函数描述:                返回删除任务标记
        //输入:                    无
        //输出:                    无
        //返回:                    任务标记
        //================================================================================================
        BOOL AutoDelete();
    
        //================================================================================================
        //函数名:                  execute
        //函数描述:                任务真正工作的函数,纯虚函数,需要子类化实现
        //输入:                    无
        //输出:                    无
        //返回:                    任务ID
        //================================================================================================
        virtual void execute() = 0;    
    private:
        long m_ID;               //任务ID
        BOOL m_autoDeleteFlag;   //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE
        int m_priority;          //任务优先级,默认为5
    
    };

      Job.cpp文件:

    long CJob::lastUsedID = 0;
    
    CJob::CJob()
    {
        this->m_ID = InterlockedIncrement(&lastUsedID);
        this->m_autoDeleteFlag = TRUE;
        this->m_priority = 5;
        this->m_Completed= FALSE;
    }
    
    CJob::~CJob()
    {
    }
    
    BOOL CJob::AutoDelete()
    {
        return m_autoDeleteFlag;
    }
    
    void CJob::setAutoDelete(BOOL autoDeleteFlag)
    {
        m_autoDeleteFlag = autoDeleteFlag;
    }
    
    long CJob::getID()
    {
        return this->m_ID;
    }
    
    int CJob::getPriority()
    {
        return this->m_priority;    
    }
    
    void CJob::setPriority(int priority)
    {
        this->m_priority = priority;
    }

      JobExecuter.h文件:

    //一个对象对应一个线程,执行任务Job
    class CJobExecuter
    {
    public:
        CJobExecuter(CMThreadedJobQ *pJobQ);
        virtual ~CJobExecuter();
        
        //================================================================================================
        //函数名:                  stop
        //函数描述:                停止执行任务
        //输入:                    无
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void stop();
        
        //================================================================================================
        //函数名:                  execute
        //函数描述:                执行一个任务
        //输入:                    [in] pJob 任务指针
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void execute(CJob* pJob);
        
        static UINT ThreadFunction(LPVOID pParam); //线程函数
        
        CMThreadedJobQ* m_pJobQ;                   //指向线程任务队列指针
        CJob* m_pJob2Do;                           //指向正在执行任务的指针
        int m_flag;                                //线程执行标记
        CWinThread* m_pExecuterThread;             //线程标识符
    };

      JobExecuter.cpp文件:

    #define STOP_WORKING -1
    #define KEEP_WORKING  0
    
    CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
    {
        this->m_pJobQ= pJobQ;
        this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
        this->m_pJob2Do = NULL;
        this->m_flag = KEEP_WORKING;
    }
    
    CJobExecuter::~CJobExecuter()
    {
        if(this->m_pExecuterThread!= NULL )    
        {
            this->m_pExecuterThread->ExitInstance();
            delete m_pExecuterThread;    
        }
    }
    
    UINT CJobExecuter::ThreadFunction(LPVOID pParam)
    {    
        CJobExecuter *pExecuter = (CJobExecuter *)pParam;
        pExecuter->m_flag = 1;
        ::Sleep(1);
        CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
        while(pExecuter->m_flag !=STOP_WORKING )
        {
            if(pExecuter->m_pJob2Do!=  NULL)
            {
                pExecuter->m_pJob2Do->execute();
                pExecuter->m_pJob2Do->m_Completed = TRUE;    
                if(pExecuter->m_pJob2Do->AutoDelete())
                    delete pExecuter->m_pJob2Do;
                pExecuter->m_pJob2Do = NULL;
            }
    
            if(pExecuter->m_pJobQ == NULL) break;
            
            CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
            singleLock.Lock();
            if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁
            {
                pExecuter->stop();    
                singleLock.Unlock();    
            }
            else
            {
                pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);      //完成任务后,添加到CMThreadedJobQ的空闲队列中
                singleLock.Unlock();    
                pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();        
                pExecuter->m_pExecuterThread->SuspendThread();        
            }                
        }
        
        if(pExecuter->m_pJobQ != NULL)
        {
            pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
        }
        else
        {
            delete pExecuter;
        }
    
        return 0;
    }
    
    void CJobExecuter::execute(CJob* pJob)
    {
        this->m_pJob2Do = pJob;
        ::Sleep(0);
        this->m_pExecuterThread->ResumeThread();
    }
    
    void CJobExecuter::stop()
    {
        this->m_flag = STOP_WORKING;
        this->m_pExecuterThread->ResumeThread();
    }

      MThreadedJobQ.h文件:

    typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;
    
    //线程池任务队列
    class CMThreadedJobQ
    {
    
    public:
        typedef struct THNODE
        {
            CJobExecuter* pExecuter;
            THNODE * pNext ;
        } THNODE;
        
        CMThreadedJobQ();
        virtual ~CMThreadedJobQ();
    
        //================================================================================================
        //函数名:                  deleteJobExecuter
        //函数描述:                删除一个JobExecuter对象
        //输入:                    [in] pEx
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void deleteJobExecuter(CJobExecuter *pEx);
        
        //================================================================================================
        //函数名:                  setMaxNoOfExecuter
        //函数描述:                设置CJobExecuter的个数
        //输入:                    [in] value
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void setMaxNoOfExecuter(int value);
    
        //================================================================================================
        //函数名:                  addJobExecuter
        //函数描述:                添加一个CJobExecuter
        //输入:                    [in] pEx
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void addJobExecuter(CJobExecuter *pEx);
        
        //================================================================================================
        //函数名:                  getJobExecuter
        //函数描述:                返回一个CJobExecuter
        //输入:                    无
        //输出:                    无
        //返回:                    处理任务的指针
        //================================================================================================
        CJobExecuter* getJobExecuter();
    
        //================================================================================================
        //函数名:                  addFreeJobExecuter
        //函数描述:                添加一个CJobExecuter
        //输入:                    [in] pEx
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void addFreeJobExecuter(CJobExecuter *pEx);
    
        //================================================================================================
        //函数名:                  addJob
        //函数描述:                添加一个任务
        //输入:                    [in] pJob
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void addJob(CJob *pJob);
        
        //================================================================================================
        //函数名:                  getMaxNoOfExecuter
        //函数描述:                获取CJobExecuter个数的最大值
        //输入:                    无
        //输出:                    无
        //返回:                    无
        //================================================================================================
        int getMaxNoOfExecuter();
        
        //================================================================================================
        //函数名:                  getNoOfExecuter
        //函数描述:                获取当前CJobExecuter的个数
        //输入:                    无
        //输出:                    无
        //返回:                    无
        //================================================================================================
        int getNoOfExecuter();
    
        static UINT JobObserverThreadFunction(LPVOID);
    
        //================================================================================================
        //函数名:                  pause
        //函数描述:                挂起JobObserverThread线程
        //输入:                    无
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void pause();
    
        //================================================================================================
        //函数名:                  resume
        //函数描述:                唤醒JobObserverThread线程
        //输入:                    无
        //输出:                    无
        //返回:                    无
        //================================================================================================
        void resume();    
            
        CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程
        CCriticalSection m_cs;         //关键代码段,用于互斥
        CJobQList m_jobQList;          //任务队列
    private :
        BOOL m_pause;                  //JobObserverThread线程运行标记
        int m_MaxNoOfExecuter;         //CJobExecuter最大个数
        int m_NoOfExecuter;            //当前CJobExecuter个数
        THNODE* m_pFreeEList;          //维护空闲处理任务线程的队列
        THNODE* m_pAllEList;           //维护所有处理任务线程的队列
    };

      MThreadedJobQ.cpp文件:

    CMThreadedJobQ::CMThreadedJobQ()
    {
        m_MaxNoOfExecuter = 2;
        m_pause = FALSE;
        m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
        m_pFreeEList =NULL;
        m_NoOfExecuter =0;
        m_pAllEList = NULL;
    }
    
    CMThreadedJobQ::~CMThreadedJobQ()
    {
        THNODE* pTempNode;
        while (m_pAllEList != NULL) 
        {    
            pTempNode = m_pAllEList->pNext;
            delete m_pAllEList->pExecuter;        
            delete m_pAllEList;        
            m_pAllEList = pTempNode;    
        }    
    
        while (m_pFreeEList != NULL) 
        {    pTempNode = m_pFreeEList->pNext;        
            delete m_pFreeEList;        
            m_pFreeEList = pTempNode;    
        }    
    
        m_pObserverThread->ExitInstance();    
        delete m_pObserverThread;
    }
    
    
    void CMThreadedJobQ::pause()
    {
        this->m_pause = TRUE;
    }
    
    void CMThreadedJobQ::resume()
    {
        this->m_pause = FALSE;
        this->m_pObserverThread->ResumeThread();
    }
    
    UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
    {
        CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
        CJobExecuter *pJExecuter;
    
        while(TRUE)
        {
            Sleep(100);
            if(pMTJQ->m_pause != TRUE)
            {
                while(!pMTJQ->m_jobQList.IsEmpty() )
                {
                    pJExecuter = pMTJQ->getJobExecuter();
                    if( pJExecuter!=NULL)
                    {                
                        pMTJQ->m_cs.Lock();
                        pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
                        pMTJQ->m_jobQList.RemoveHead();
                        AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
                        pMTJQ->m_cs.Unlock();
                    }
                    else
                    {
                        break;
                    }
                    if(pMTJQ->m_pause == TRUE)
                        break;
                }
            }
            pMTJQ->m_pObserverThread->SuspendThread();
        }
        return 0;
    }
    
    int CMThreadedJobQ::getNoOfExecuter()
    {
        return this->m_NoOfExecuter;
    }
    
    int CMThreadedJobQ::getMaxNoOfExecuter()
    {
        return this->m_MaxNoOfExecuter;
    }
    
    void CMThreadedJobQ::addJob(CJob *pJob)
    {
        CJob * pTempJob;
        CSingleLock sLock(&this->m_cs);
        sLock.Lock();    
        POSITION pos,lastPos;
        pos = this->m_jobQList.GetHeadPosition();    
        lastPos = pos;
        if(pos != NULL)
            pTempJob =this->m_jobQList.GetHead();
        while(pos != NULL )
        {        
            if( pJob->getPriority() > pTempJob->getPriority())
                break;
            lastPos = pos;
            pTempJob =     this->m_jobQList.GetNext(pos);        
        }    
        if(pos == NULL)    
            this->m_jobQList.AddTail(pJob);
        else
            this->m_jobQList.InsertBefore(lastPos,pJob);
        this->m_pObserverThread->ResumeThread();
        sLock.Unlock();
    }
    
    void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
    {
        m_cs.Lock();
        THNODE* node = new THNODE;
        node->pExecuter = pEx;
        node->pNext = this->m_pFreeEList;
        this->m_pFreeEList = node;
        m_cs.Unlock();
    }
    
    CJobExecuter* CMThreadedJobQ::getJobExecuter()
    {
        THNODE *pTemp;
        CJobExecuter *pEx=NULL;
        m_cs.Lock();
    
        if(this->m_pFreeEList != NULL)  //有空闲CJobExecuter,就返回
        {
            pTemp = this->m_pFreeEList;
            this->m_pFreeEList = this->m_pFreeEList->pNext;
            pEx = pTemp->pExecuter;
            delete pTemp ;
            m_cs.Unlock();
            return pEx;
        }
    
        if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter
        {
            pEx =  new CJobExecuter(this);
            this->addJobExecuter(pEx);
            this->m_NoOfExecuter++;
            m_cs.Unlock();
            return pEx;
        }
        m_cs.Unlock();
        return NULL;
    }
    
    void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
    {
        m_cs.Lock();
        THNODE* node = new THNODE;
        node->pExecuter= pEx;
        node->pNext = this->m_pAllEList;
        this->m_pAllEList = node;
        m_cs.Unlock();
    }
    
    void CMThreadedJobQ::setMaxNoOfExecuter(int value)
    {
        this->m_cs.Lock();
        if(value >1 && value <11)
            this->m_MaxNoOfExecuter = value;
        m_pObserverThread->ResumeThread();
        this->m_cs.Unlock();
    }
    
    void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
    {
        THNODE* pNode,*pNodeP;
        CSingleLock singleLock(&m_cs);    
        singleLock.Lock();    
        if(this->m_pAllEList != NULL)
        {
            pNode = this->m_pAllEList;
            if(pNode->pExecuter == pEx )    
            {
              this->m_pAllEList = pNode->pNext;
              delete pNode;          
            }
            else
            {
                pNodeP =pNode;
                pNode  = pNode->pNext ;            
                while(pNode != NULL )
                {
                    if(pNode->pExecuter== pEx ) break;
                    pNodeP = pNode;
                    pNode  = pNode->pNext ;            
                }
                if(pNode!= NULL)
                {
                    pNodeP->pNext = pNode->pNext;
                    delete pNode;
                }
            }
        }
        this->m_NoOfExecuter--;
        singleLock.Unlock();
        pEx->stop();
        Sleep(1);
        delete pEx;
    }

      以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。

  • 相关阅读:
    1144 The Missing Number (20分)
    1145 Hashing
    1146 Topological Order (25分)
    1147 Heaps (30分)
    1148 Werewolf
    1149 Dangerous Goods Packaging (25分)
    TypeReference
    Supervisor安装与配置()二
    谷粒商城ES调用(十九)
    Found interface org.elasticsearch.common.bytes.BytesReference, but class was expected
  • 原文地址:https://www.cnblogs.com/venow/p/2808392.html
Copyright © 2011-2022 走看看