zoukankan      html  css  js  c++  java
  • C++实现一个多线程同步方式的协同工作程序示例

    多线程并发程序与协同程序其实是不同的概念。多线程并发是多个执行序同时运行,而协同程序是多个执行序列相互协作,同一时刻只有一个执行序列。今天想到的是将两者结合起来,拿现实生活中的例子来说,假设一个班级有100个学生,一个老师要批改100个学生的作业,有时老师太忙或者赶时间会叫几个同学帮忙批改,等所有同学都批改完后都交到老师手中,老师在下次上课的时候将作业本一起发给班上的学生。。。。其实在并发编程的时候也可以借鉴这一个思想和模式,特别是网络服务器开发的过程中,并发与协同经常出现,于是今天写了一个简单的程序模拟了这种情形,当然这个程序本身并没有任何意义,只是记录下这种思想,个人一直都觉得,程序开发中,思想是最为重要的,用什么语言来实现只是表现上不同,今天记录下来,日后的开发过程中,在适当地方以此思想为基础,根据项目需要进行拓展!

      1 //--------------------------------------------------------------
      2   开发工具:Visual Studio 2012
      3 //---------------------------------------------------------------
      4 //C++
      5 #include <iostream>
      6 #include <memory>
      7 #include <thread>
      8 #include <mutex>
      9 #include <condition_variable>
     10 #include <queue>
     11 #include <vector>
     12 
     13 using namespace std;
     14 
     15 //windows
     16 #include <windows.h>
     17 
     18 
     19 /************************************************
     20     [示例]实现一个多线程方式下的协同工作程序
     21 
     22     当一个线程(相对的主线程)在完成一个任务的时
     23     候,有时候为了提高效率,可以充分利用多核CPU的
     24     优势可以将手中的任务分成多个部分,分发给比较
     25     空闲的辅助线程来帮助处理,并且主线程要等待所
     26     有的辅助线程都处理完成后,对所有任务进行一次
     27     汇总,才能进行下一步操作,此时就需要一个同步的
     28     多线程协同工作类。
     29 *************************************************/
     30 
     31 
     32 //定义一个求累积和的任务类
     33 class CSumTask
     34 {
     35 public:
     36     CSumTask(double dStart,double dEnd);
     37     ~CSumTask();
     38     double DoTask();
     39     double GetResult();
     40 private:
     41     double m_dMin;
     42     double m_dMax;
     43     double m_dResult;
     44 };
     45 
     46 CSumTask::CSumTask(double dStart,double dEnd):m_dMin(dStart),m_dMax(dEnd),m_dResult(0.0)
     47 {
     48 
     49 }
     50 CSumTask::~CSumTask()
     51 {
     52 
     53 }
     54 double CSumTask::DoTask()
     55 {
     56     
     57     for(double dNum = m_dMin;dNum <= m_dMax;++dNum)
     58     {
     59         m_dResult += dNum;
     60     }
     61     return m_dResult;
     62 }
     63 
     64 double CSumTask::GetResult()
     65 {
     66     return m_dResult;
     67 }
     68 
     69 
     70 //定义一个任务管理者
     71 class CTaskManager
     72 {
     73 public:
     74     CTaskManager();
     75     ~CTaskManager();
     76     size_t Size();
     77     void AddTask(const std::shared_ptr<CSumTask> TaskPtr);
     78     std::shared_ptr<CSumTask> PopTask();
     79 protected:
     80     std::queue<std::shared_ptr<CSumTask>> m_queTask;
     81 };
     82 
     83 CTaskManager::CTaskManager()
     84 {
     85 
     86 }
     87 
     88 CTaskManager::~CTaskManager()
     89 {
     90 
     91 }
     92 
     93 size_t CTaskManager::Size()
     94 {
     95     return m_queTask.size();
     96 }
     97 
     98 void CTaskManager::AddTask(const std::shared_ptr<CSumTask> TaskPtr)
     99 {
    100     m_queTask.push(std::move(TaskPtr));
    101 }
    102 
    103 std::shared_ptr<CSumTask> CTaskManager::PopTask()
    104 {
    105     std::shared_ptr<CSumTask> tmPtr = m_queTask.front();
    106     m_queTask.pop();
    107     return tmPtr;
    108 }
    109 
    110 
    111 //协同工作线程管理类,负责创建协同工作线程并接受来自主线程委托的任务进行处理
    112 class CWorkThreadManager
    113 {
    114 public:
    115     CWorkThreadManager(unsigned int uiThreadSum );
    116     ~CWorkThreadManager();
    117     bool AcceptTask(std::shared_ptr<CSumTask> TaskPtr);
    118     bool StopAll(bool bStop);
    119     unsigned int ThreadNum();
    120 protected:
    121     std::queue<std::shared_ptr<CSumTask>> m_queTask;
    122     std::mutex m_muTask;
    123     int m_iWorkingThread;
    124     int m_iWorkThreadSum;
    125     std::vector<std::shared_ptr<std::thread>> m_vecWorkers;
    126 
    127     void WorkThread(int iWorkerID);
    128     bool m_bStop;
    129     std::condition_variable_any m_condPop;
    130     std::condition_variable_any m_stopVar;
    131 };
    132 
    133 CWorkThreadManager::~CWorkThreadManager()
    134 {
    135 
    136 }
    137 unsigned int CWorkThreadManager::ThreadNum()
    138 {
    139     return m_iWorkThreadSum;
    140 }
    141 
    142 CWorkThreadManager::CWorkThreadManager(unsigned int uiThreadSum ):m_bStop(false),m_iWorkingThread(0),m_iWorkThreadSum(uiThreadSum)
    143 {
    144     //创建工作线程
    145     for(int i = 0; i < m_iWorkThreadSum;++i)
    146     {
    147         std::shared_ptr<std::thread> WorkPtr(new std::thread(&CWorkThreadManager::WorkThread,this,i+1)); 
    148         m_vecWorkers.push_back(WorkPtr);
    149     }
    150     
    151 }
    152 
    153 bool CWorkThreadManager::AcceptTask(std::shared_ptr<CSumTask> TaskPtr)
    154 {
    155     std::unique_lock<std::mutex>    muLock(m_muTask);
    156     if(m_iWorkingThread >= m_iWorkThreadSum)
    157     {
    158         return false;            //当前已没有多余的空闲的线程处理任务
    159     }
    160     m_queTask.push(TaskPtr);
    161     m_condPop.notify_all();
    162     return true;
    163 }
    164 
    165  void CWorkThreadManager::WorkThread(int iWorkerID)
    166  {
    167      while(!m_bStop)
    168      {
    169          std::shared_ptr<CSumTask> TaskPtr;
    170          bool bDoTask = false;
    171          {
    172             std::unique_lock<std::mutex>    muLock(m_muTask);
    173             while(m_queTask.empty() && !m_bStop)
    174             {
    175                 m_condPop.wait(m_muTask);
    176             }
    177             if(!m_queTask.empty())
    178             {
    179                 TaskPtr = m_queTask.front();
    180                 m_queTask.pop();
    181                 m_iWorkingThread++;
    182                 bDoTask = true;
    183             }
    184              
    185          }
    186         //处理任务
    187          if(bDoTask)
    188          {
    189              TaskPtr->DoTask();
    190              {
    191                  std::unique_lock<std::mutex>    muLock(m_muTask);
    192                  m_iWorkingThread--;
    193                  cout<<">>>DoTask in thread ["<<iWorkerID<<"]
    ";
    194              }
    195          }
    196          m_stopVar.notify_all();
    197      }
    198  }
    199 
    200  bool CWorkThreadManager::StopAll(bool bStop)
    201  {
    202      {
    203          std::unique_lock<std::mutex>    muLock(m_muTask);
    204          while(m_queTask.size()>0 || m_iWorkingThread>0)
    205          {
    206              m_stopVar.wait(m_muTask);
    207              cout<<">>>Waiting finish....
    ";
    208          }
    209         cout<<">>>All task finished!
    ";
    210         
    211      }
    212 
    213      m_bStop = true;
    214      m_condPop.notify_all();
    215      //等待所有线程关闭
    216      for(std::vector<std::shared_ptr<std::thread>>::iterator itTask = m_vecWorkers.begin();itTask != m_vecWorkers.end();++itTask)
    217      {
    218         (*itTask)->join();
    219      }
    220      return true;
    221  }
    222 
    223 
    224  /**************************************
    225   [示例程序说明]
    226 
    227       每个任务对象表示求1+2+....+1000的累
    228   积和,现在有2000个这样的任务,需要将每个
    229   任务进行计算,然后将所有的结果汇总求和。
    230       利用多线程协同工作类对象辅助完成每
    231   个任务结果计算,主线程等待所有结果完成
    232   后将所有结果汇总求和。
    233  ****************************************/
    234 
    235 
    236 int main(int arg,char *arv[])
    237 {
    238 
    239     std::cout.sync_with_stdio(true);
    240     CTaskManager TaskMgr;
    241     CWorkThreadManager WorkerMgr(5);
    242     std::vector<std::shared_ptr<CSumTask>> vecResultTask;
    243 
    244     for(int i = 0; i < 2000; ++i)
    245     {
    246         std::shared_ptr<CSumTask> TaskPtr(new CSumTask(1.0,1000.0));
    247         TaskMgr.AddTask(TaskPtr);
    248         vecResultTask.push_back(TaskPtr);
    249     }
    250 
    251     //
    252     DWORD dStartTime = ::GetTickCount();
    253     while(TaskMgr.Size()>0)
    254     {
    255         std::shared_ptr<CSumTask> WorkPtr = TaskMgr.PopTask();
    256         if(!WorkerMgr.AcceptTask(WorkPtr))
    257         {
    258             //辅助线程此刻处于忙碌状态(没有空闲帮忙),自己处理该任务
    259             WorkPtr->DoTask();
    260             cout<<">>>DoTask in thread [0]
    ";
    261         }
    262     }
    263     WorkerMgr.StopAll(true);                    //等待所有的任务完成
    264 
    265     //对所有结果求和
    266     double dSumResult = 0.0;
    267     for(std::vector<std::shared_ptr<CSumTask>>::iterator itTask = vecResultTask.begin();itTask != vecResultTask.end();++itTask)
    268     {
    269         dSumResult += (*itTask)->GetResult();
    270     }
    271 
    272     DWORD dEndTime = ::GetTickCount();
    273     cout<<"
    [Status]"<<endl;
    274     cout<<"	Every task result:"<<vecResultTask[0]->GetResult()<<endl;
    275     cout<<"	Task num:"<<vecResultTask.size()<<endl;
    276     cout<<"	All result sum:"<<dSumResult;
    277     cout<<"	Cast to int,result:"<<static_cast<long long>(dSumResult)<<endl;
    278     cout<<"	Workthread num:"<<WorkerMgr.ThreadNum()<<endl;
    279     cout<<"	Time of used:"<<dEndTime-dStartTime<<" ms"<<endl;
    280     getchar();
    281     return 0;
    282 }

     

  • 相关阅读:
    垃圾回收机制_合集
    线程_同步应用
    动态给类的实例对象 或 类 添加属性
    【Python】画一个心形
    【JS】网站运行时间
    【Python】random库
    【HTML】iframe嵌套界面自适应,可高度自由收缩
    【HTML】三种方法使HTML单页面输入密码才能访问
    维护
    投喂记录
  • 原文地址:https://www.cnblogs.com/knight-monkey/p/Monkey_study_blog_201507092040.html
Copyright © 2011-2022 走看看