zoukankan      html  css  js  c++  java
  • 可伸缩多线程任务队列(转载)

    出自:http://blog.csdn.net/tianmohust/article/details/9335243

    在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

      出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下几个功能:

        1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

        2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

        3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

        4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

      大体框架主要由3个类构成

        1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

        2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

        3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

      类图如下:

      该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

    Job.h文件:

     

     1 class CJob
     2 {
     3 public:
     4     CJob();
     5     virtual ~CJob();    
     6     BOOL m_Completed;         //任务是否完成:TRUE 完成,FALSE 未完成
     7     static long lastUsedID;   //最后的ID    
     8     //================================================================================================
     9     //函数名:                  setPriority
    10     //函数描述:                设置任务优先级
    11     //输入:                    [in] priority 优先级别
    12     //输出:                    无
    13     //返回:                    无
    14     //================================================================================================
    15     void setPriority(int priority);
    16     //================================================================================================
    17     //函数名:                  getPriority
    18     //函数描述:                返回任务优先级
    19     //输入:                    无
    20     //输出:                    无
    21     //返回:                    任务优先级
    22     //================================================================================================
    23     int getPriority();    
    24     //================================================================================================
    25     //函数名:                  getID
    26     //函数描述:                返回任务ID
    27     //输入:                    无
    28     //输出:                    无
    29     //返回:                    任务ID
    30     //================================================================================================
    31     long getID();    
    32     //================================================================================================
    33     //函数名:                  setAutoDelete
    34     //函数描述:                设置完成任务后是否删除任务
    35     //输入:                    [in] autoDeleteFlag
    36     //输出:                    无
    37     //返回:                    无
    38     //================================================================================================
    39     void setAutoDelete(BOOL autoDeleteFlag = TRUE);
    40     //================================================================================================
    41     //函数名:                  AutoDelete
    42     //函数描述:                返回删除任务标记
    43     //输入:                    无
    44     //输出:                    无
    45     //返回:                    任务标记
    46     //================================================================================================
    47     BOOL AutoDelete();
    48     //================================================================================================
    49     //函数名:                  execute
    50     //函数描述:                任务真正工作的函数,纯虚函数,需要子类化实现
    51     //输入:                    无
    52     //输出:                    无
    53     //返回:                    任务ID
    54     //================================================================================================
    55     virtual void execute() = 0;    
    56 private:
    57     long m_ID;               //任务ID
    58     BOOL m_autoDeleteFlag;   //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE
    59     int m_priority;          //任务优先级,默认为5
    60 };

     

    Job.cpp文件:

     1 long CJob::lastUsedID = 0;
     2 
     3 CJob::CJob()
     4 {
     5     this->m_ID = InterlockedIncrement(&lastUsedID);
     6     this->m_autoDeleteFlag = TRUE;
     7     this->m_priority = 5;
     8     this->m_Completed= FALSE;
     9 }
    10 CJob::~CJob()
    11 {
    12 }
    13 BOOL CJob::AutoDelete()
    14 {
    15     return m_autoDeleteFlag;
    16 }
    17 void CJob::setAutoDelete(BOOL autoDeleteFlag)
    18 {
    19     m_autoDeleteFlag = autoDeleteFlag;
    20 }
    21 long CJob::getID()
    22 {
    23     return this->m_ID;
    24 }
    25 int CJob::getPriority()
    26 {
    27     return this->m_priority;    
    28 }
    29 void CJob::setPriority(int priority)
    30 {
    31     this->m_priority = priority;
    32 }

    JobExecuter.h文件:

     1 //一个对象对应一个线程,执行任务Job
     2 class CJobExecuter
     3 {
     4 public:
     5     CJobExecuter(CMThreadedJobQ *pJobQ);
     6     virtual ~CJobExecuter();    
     7     //================================================================================================
     8     //函数名:                  stop
     9     //函数描述:                停止执行任务
    10     //输入:                    无
    11     //输出:                    无
    12     //返回:                    无
    13     //================================================================================================
    14     void stop();    
    15     //================================================================================================
    16     //函数名:                  execute
    17     //函数描述:                执行一个任务
    18     //输入:                    [in] pJob 任务指针
    19     //输出:                    无
    20     //返回:                    无
    21     //================================================================================================
    22     void execute(CJob* pJob);    
    23     static UINT ThreadFunction(LPVOID pParam); //线程函数    
    24     CMThreadedJobQ* m_pJobQ;                   //指向线程任务队列指针
    25     CJob* m_pJob2Do;                           //指向正在执行任务的指针
    26     int m_flag;                                //线程执行标记
    27     CWinThread* m_pExecuterThread;             //线程标识符
    28 };

    JobExecuter.cpp文件:

     1 #define STOP_WORKING -1
     2 #define KEEP_WORKING  0
     3 CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
     4 {
     5     this->m_pJobQ= pJobQ;
     6     this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
     7     this->m_pJob2Do = NULL;
     8     this->m_flag = KEEP_WORKING;
     9 }
    10 CJobExecuter::~CJobExecuter()
    11 {
    12     if(this->m_pExecuterThread!= NULL )    
    13     {
    14         this->m_pExecuterThread->ExitInstance();
    15         delete m_pExecuterThread;    
    16     }
    17 }
    18 UINT CJobExecuter::ThreadFunction(LPVOID pParam)
    19 {    
    20     CJobExecuter *pExecuter = (CJobExecuter *)pParam;
    21     pExecuter->m_flag = 1;
    22     ::Sleep(1);
    23     CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
    24     while(pExecuter->m_flag !=STOP_WORKING )
    25     {
    26         if(pExecuter->m_pJob2Do!=  NULL)
    27         {
    28             pExecuter->m_pJob2Do->execute();
    29             pExecuter->m_pJob2Do->m_Completed = TRUE;    
    30             if(pExecuter->m_pJob2Do->AutoDelete())
    31                 delete pExecuter->m_pJob2Do;
    32             pExecuter->m_pJob2Do = NULL;
    33         }
    34 
    35         if(pExecuter->m_pJobQ == NULL) break;        
    36         CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);        
    37         singleLock.Lock();
    38         if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁
    39         {
    40             pExecuter->stop();    
    41             singleLock.Unlock();    
    42         }
    43         else
    44         {
    45             pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);      //完成任务后,添加到CMThreadedJobQ的空闲队列中
    46             singleLock.Unlock();    
    47             pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();        
    48             pExecuter->m_pExecuterThread->SuspendThread();        
    49         }                
    50     }    
    51     if(pExecuter->m_pJobQ != NULL)
    52     {
    53         pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
    54     }
    55     else
    56     {
    57         delete pExecuter;
    58     }
    59     return 0;
    60 }
    61 void CJobExecuter::execute(CJob* pJob)
    62 {
    63     this->m_pJob2Do = pJob;
    64     ::Sleep(0);
    65     this->m_pExecuterThread->ResumeThread();
    66 }
    67 void CJobExecuter::stop()
    68 {
    69     this->m_flag = STOP_WORKING;
    70     this->m_pExecuterThread->ResumeThread();
    71 }

    MThreadedJobQ.h文件

      1 typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;
      2 //线程池任务队列
      3 class CMThreadedJobQ
      4 {
      5 public:
      6     typedef struct THNODE
      7     {
      8         CJobExecuter* pExecuter;
      9         THNODE * pNext ;
     10     } THNODE;
     11     
     12     CMThreadedJobQ();
     13     virtual ~CMThreadedJobQ();
     14     //================================================================================================
     15     //函数名:                  deleteJobExecuter
     16     //函数描述:                删除一个JobExecuter对象
     17     //输入:                    [in] pEx
     18     //输出:                    无
     19     //返回:                    无
     20     //================================================================================================
     21     void deleteJobExecuter(CJobExecuter *pEx);    
     22     //================================================================================================
     23     //函数名:                  setMaxNoOfExecuter
     24     //函数描述:                设置CJobExecuter的个数
     25     //输入:                    [in] value
     26     //输出:                    无
     27     //返回:                    无
     28     //================================================================================================
     29     void setMaxNoOfExecuter(int value);
     30     //================================================================================================
     31     //函数名:                  addJobExecuter
     32     //函数描述:                添加一个CJobExecuter
     33     //输入:                    [in] pEx
     34     //输出:                    无
     35     //返回:                    无
     36     //================================================================================================
     37     void addJobExecuter(CJobExecuter *pEx);    
     38     //================================================================================================
     39     //函数名:                  getJobExecuter
     40     //函数描述:                返回一个CJobExecuter
     41     //输入:                    无
     42     //输出:                    无
     43     //返回:                    处理任务的指针
     44     //================================================================================================
     45     CJobExecuter* getJobExecuter();
     46     //================================================================================================
     47     //函数名:                  addFreeJobExecuter
     48     //函数描述:                添加一个CJobExecuter
     49     //输入:                    [in] pEx
     50     //输出:                    无
     51     //返回:                    无
     52     //================================================================================================
     53     void addFreeJobExecuter(CJobExecuter *pEx);
     54     //================================================================================================
     55     //函数名:                  addJob
     56     //函数描述:                添加一个任务
     57     //输入:                    [in] pJob
     58     //输出:                    无
     59     //返回:                    无
     60     //================================================================================================
     61     void addJob(CJob *pJob);    
     62     //================================================================================================
     63     //函数名:                  getMaxNoOfExecuter
     64     //函数描述:                获取CJobExecuter个数的最大值
     65     //输入:                    无
     66     //输出:                    无
     67     //返回:                    无
     68     //================================================================================================
     69     int getMaxNoOfExecuter();    
     70     //================================================================================================
     71     //函数名:                  getNoOfExecuter
     72     //函数描述:                获取当前CJobExecuter的个数
     73     //输入:                    无
     74     //输出:                    无
     75     //返回:                    无
     76     //================================================================================================
     77     int getNoOfExecuter();
     78     static UINT JobObserverThreadFunction(LPVOID);
     79     //================================================================================================
     80     //函数名:                  pause
     81     //函数描述:                挂起JobObserverThread线程
     82     //输入:                    无
     83     //输出:                    无
     84     //返回:                    无
     85     //================================================================================================
     86     void pause();
     87     //================================================================================================
     88     //函数名:                  resume
     89     //函数描述:                唤醒JobObserverThread线程
     90     //输入:                    无
     91     //输出:                    无
     92     //返回:                    无
     93     //================================================================================================
     94     void resume();            
     95     CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程
     96     CCriticalSection m_cs;         //关键代码段,用于互斥
     97     CJobQList m_jobQList;          //任务队列
     98 private :
     99     BOOL m_pause;                  //JobObserverThread线程运行标记
    100     int m_MaxNoOfExecuter;         //CJobExecuter最大个数
    101     int m_NoOfExecuter;            //当前CJobExecuter个数
    102     THNODE* m_pFreeEList;          //维护空闲处理任务线程的队列
    103     THNODE* m_pAllEList;           //维护所有处理任务线程的队列
    104 };

    MThreadedJobQ.cpp文件:

      1 CMThreadedJobQ::CMThreadedJobQ()
      2 {
      3     m_MaxNoOfExecuter = 2;
      4     m_pause = FALSE;
      5     m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
      6     m_pFreeEList =NULL;
      7     m_NoOfExecuter =0;
      8     m_pAllEList = NULL;
      9 }
     10 CMThreadedJobQ::~CMThreadedJobQ()
     11 {
     12     THNODE* pTempNode;
     13     while (m_pAllEList != NULL) 
     14     {    
     15         pTempNode = m_pAllEList->pNext;
     16         delete m_pAllEList->pExecuter;        
     17         delete m_pAllEList;        
     18         m_pAllEList = pTempNode;    
     19     }  
     20     while (m_pFreeEList != NULL) 
     21     {    pTempNode = m_pFreeEList->pNext;        
     22         delete m_pFreeEList;        
     23         m_pFreeEList = pTempNode;    
     24     }    
     25 
     26     m_pObserverThread->ExitInstance();    
     27     delete m_pObserverThread;
     28 }
     29 void CMThreadedJobQ::pause()
     30 {
     31     this->m_pause = TRUE;
     32 }
     33 void CMThreadedJobQ::resume()
     34 {
     35     this->m_pause = FALSE;
     36     this->m_pObserverThread->ResumeThread();
     37 }
     38 UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
     39 {
     40     CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
     41     CJobExecuter *pJExecuter;
     42     while(TRUE)
     43     {
     44         Sleep(100);
     45         if(pMTJQ->m_pause != TRUE)
     46         {
     47             while(!pMTJQ->m_jobQList.IsEmpty() )
     48             {
     49                 pJExecuter = pMTJQ->getJobExecuter();
     50                 if( pJExecuter!=NULL)
     51                 {                
     52                     pMTJQ->m_cs.Lock();
     53                     pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
     54                     pMTJQ->m_jobQList.RemoveHead();
     55                     AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
     56                     pMTJQ->m_cs.Unlock();
     57                 }
     58                 else
     59                 {
     60                     break;
     61                 }
     62                 if(pMTJQ->m_pause == TRUE)
     63                     break;
     64             }
     65         }
     66         pMTJQ->m_pObserverThread->SuspendThread();
     67     }
     68     return 0;
     69 }
     70 int CMThreadedJobQ::getNoOfExecuter()
     71 {
     72     return this->m_NoOfExecuter;
     73 }
     74 
     75 int CMThreadedJobQ::getMaxNoOfExecuter()
     76 {
     77     return this->m_MaxNoOfExecuter;
     78 }
     79 void CMThreadedJobQ::addJob(CJob *pJob)
     80 {
     81     CJob * pTempJob;
     82     CSingleLock sLock(&this->m_cs);
     83     sLock.Lock();    
     84     POSITION pos,lastPos;
     85     pos = this->m_jobQList.GetHeadPosition();    
     86     lastPos = pos;
     87     if(pos != NULL)
     88         pTempJob =this->m_jobQList.GetHead();
     89     while(pos != NULL )
     90     {        
     91         if( pJob->getPriority() > pTempJob->getPriority())
     92             break;
     93         lastPos = pos;
     94         pTempJob =     this->m_jobQList.GetNext(pos);        
     95     }    
     96     if(pos == NULL)    
     97         this->m_jobQList.AddTail(pJob);
     98     else
     99         this->m_jobQList.InsertBefore(lastPos,pJob);
    100     this->m_pObserverThread->ResumeThread();
    101     sLock.Unlock();
    102 }
    103 void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
    104 {
    105     m_cs.Lock();
    106     THNODE* node = new THNODE;
    107     node->pExecuter = pEx;
    108     node->pNext = this->m_pFreeEList;
    109     this->m_pFreeEList = node;
    110     m_cs.Unlock();
    111 }
    112 CJobExecuter* CMThreadedJobQ::getJobExecuter()
    113 {
    114     THNODE *pTemp;
    115     CJobExecuter *pEx=NULL;
    116     m_cs.Lock();
    117     if(this->m_pFreeEList != NULL)  //有空闲CJobExecuter,就返回
    118     {
    119         pTemp = this->m_pFreeEList;
    120         this->m_pFreeEList = this->m_pFreeEList->pNext;
    121         pEx = pTemp->pExecuter;
    122         delete pTemp ;
    123         m_cs.Unlock();
    124         return pEx;
    125     }
    126     if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter
    127     {
    128         pEx =  new CJobExecuter(this);
    129         this->addJobExecuter(pEx);
    130         this->m_NoOfExecuter++;
    131         m_cs.Unlock();
    132         return pEx;
    133     }
    134     m_cs.Unlock();
    135     return NULL;
    136 }
    137 void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
    138 {
    139     m_cs.Lock();
    140     THNODE* node = new THNODE;
    141     node->pExecuter= pEx;
    142     node->pNext = this->m_pAllEList;
    143     this->m_pAllEList = node;
    144     m_cs.Unlock();
    145 }
    146 void CMThreadedJobQ::setMaxNoOfExecuter(int value)
    147 {
    148     this->m_cs.Lock();
    149     if(value >1 && value <11)
    150         this->m_MaxNoOfExecuter = value;
    151     m_pObserverThread->ResumeThread();
    152     this->m_cs.Unlock();
    153 }
    154 void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
    155 {
    156     THNODE* pNode,*pNodeP;
    157     CSingleLock singleLock(&m_cs);    
    158     singleLock.Lock();    
    159     if(this->m_pAllEList != NULL)
    160     {
    161         pNode = this->m_pAllEList;
    162         if(pNode->pExecuter == pEx )    
    163         {
    164           this->m_pAllEList = pNode->pNext;
    165           delete pNode;          
    166         }
    167         else
    168         {
    169             pNodeP =pNode;
    170             pNode  = pNode->pNext ;            
    171             while(pNode != NULL )
    172             {
    173                 if(pNode->pExecuter== pEx ) break;
    174                 pNodeP = pNode;
    175                 pNode  = pNode->pNext ;            
    176             }
    177             if(pNode!= NULL)
    178             {
    179                 pNodeP->pNext = pNode->pNext;
    180                 delete pNode;
    181             }
    182         }
    183     }
    184     this->m_NoOfExecuter--;
    185     singleLock.Unlock();
    186     pEx->stop();
    187     Sleep(1);
    188     delete pEx;
    189 }

     

  • 相关阅读:
    未来的计划和考虑
    jquery 常用的方法
    对于页面动态加载的元素事件无效的解决方案
    Myeclipse8.5中svn插件安装方法总结
    JS读RSS
    JAVA的RSS处理
    环境:win7+ie8 IE8的F12不起作用,原因如下:
    关闭和释放JDBC
    关于Eclipse无法生成class文件的问题
    JavaScript跨域总结与解决办法
  • 原文地址:https://www.cnblogs.com/chechen/p/3849398.html
Copyright © 2011-2022 走看看