zoukankan      html  css  js  c++  java
  • BOOST 线程完全攻略

     

     
    1. // controlled_module_ex.hpp : controlled_module类的扩展
    2. // 增强线程之间消息通讯
    3. // 增加线程安全启动和安全关闭功能
    4. // 增加定时器功能
    5. #pragma once
    6. #include <boost/shared_ptr.hpp>
    7. #include <boost/any.hpp>
    8. #include "controlled_module.hpp"
    9. struct _command
    10. {
    11.     typedef boost::shared_ptr<_command> CCmdPtr;
    12.     unsigned int nCmd;
    13.     boost::any anyParam;
    14. };
    15. struct _wait_command
    16. {       
    17.     boost::any par;
    18.     unsigned int command;
    19.     void * event;
    20.     boost::shared_ptr<boost::any> resp;
    21. };
    22. class controlled_module_ex;
    23. struct _notify
    24. {
    25.     controlled_module_ex * sender;
    26.     int id;
    27.     boost::any par;
    28. };
    29. #define BM_RESERVE 1000
    30. #define BM_RING_START BM_RESERVE+1
    31. #define BM_RING_STOP BM_RESERVE+2
    32. #define BM_RING_SETTIME  BM_RESERVE+3
    33. #define BM_RING_SETPARENT BM_RESERVE+4
    34. #define BM_RING_CYCLE BM_RESERVE+5
    35. #define BM_RING_PROCESS BM_RESERVE+6
    36. #define BM_RING_PROCESSEND BM_RESERVE+7
    37. #define BM_RING_PROCESSFAIL BM_RESERVE+8
    38. #define BM_TIMER    BM_RESERVE+9
    39. #define BM_COMMAND  BM_RESERVE+10
    40. #define BM_NOTIFY   BM_RESERVE+11
    41. #define BM_USER 9000
    42. class controlled_timer;
    43. class controlled_module_ex: public controlled_module
    44. {
    45. public:
    46.     controlled_module_ex()
    47.     {
    48.         m_safe = false;
    49.     }
    50.     ~controlled_module_ex()
    51.     {
    52.         safestop();
    53.     }
    54. public:
    55.     template<typename T>
    56.     bool postmessage(unsigned int nCmd, const boost::shared_ptr<T>& p)
    57.     {
    58.         if(this==0||!m_safe)return false;
    59.         boost::mutex::scoped_lock lock(m_mutex_command);
    60.         _command::CCmdPtr cmd(new _command);
    61.         cmd->nCmd = nCmd;
    62.         cmd->anyParam = p;
    63.         m_list_command.push_back(cmd);
    64.         return true;
    65.     }
    66.     boost::any execute(unsigned int command,boost::any par,int timeout=-1)
    67.     {
    68.         boost::shared_ptr<_wait_command> shared(new _wait_command);
    69.         _wait_command & cmd = *shared;
    70.         cmd.command = command;
    71.         cmd.event = (void *)CreateEvent(0,FALSE,FALSE,0);
    72.         cmd.par = par;
    73.         cmd.resp = boost::shared_ptr<boost::any>(new boost::any);
    74.         if(this->postmessage(BM_COMMAND,shared))
    75.         {
    76.             DWORD dw = WaitForSingleObject(cmd.event,timeout);
    77.             CloseHandle(cmd.event);
    78.             if(dw!=WAIT_OBJECT_0)
    79.                 return boost::any();
    80.             else
    81.                 return *cmd.resp;
    82.         }
    83.         else
    84.         {
    85.             CloseHandle(cmd.event);
    86.             return boost::any();
    87.         }
    88.     }
    89.     void notify(_notify p)
    90.     {
    91.         this->postmessage(BM_NOTIFY,p);
    92.     }
    93.     bool postmessage(unsigned int nCmd,boost::any p)
    94.     {
    95.         if(this==0||!m_safe)
    96.             return false;
    97.         boost::mutex::scoped_lock lock(m_mutex_command);
    98.         _command::CCmdPtr cmd(new _command);
    99.         cmd->nCmd = nCmd;
    100.         cmd->anyParam = p;
    101.         m_list_command.push_back(cmd);
    102.         return true;
    103.     }
    104.     bool postmessage(unsigned int nCmd)
    105.     {
    106.         if(this==0||!m_safe)
    107.             return false;
    108.         boost::mutex::scoped_lock lock(m_mutex_command);
    109.         _command::CCmdPtr cmd(new _command);
    110.         cmd->nCmd = nCmd;
    111.         cmd->anyParam = 0;
    112.         m_list_command.push_back(cmd);
    113.         return true;
    114.     }
    115.     virtual bool work()
    116.     {
    117.         if(!getmessage())
    118.             return false;
    119.         else
    120.         {
    121.             Sleep(this->m_sleeptime);
    122.             return true;
    123.         }
    124.     }
    125.     virtual void message(const _command & cmd)
    126.     {
    127.         if(cmd.nCmd==BM_RING_START)
    128.         {
    129.             this->on_safestart();
    130.         }
    131.         else if(cmd.nCmd==BM_RING_STOP)
    132.         {
    133.             this->on_safestop();
    134.         }
    135.         else if(cmd.nCmd==BM_TIMER)
    136.         {
    137.             this->on_timer(boost::any_cast<controlled_timer*>(cmd.anyParam));
    138.         }
    139.         else if(cmd.nCmd==BM_COMMAND)
    140.         {
    141.             boost::shared_ptr<_wait_command> shared = boost::any_cast< boost::shared_ptr<_wait_command> >(cmd.anyParam);
    142.             _wait_command & cmd = *shared;
    143.             *cmd.resp = this->on_command(cmd.command,cmd.par);
    144.             SetEvent((HANDLE)cmd.event);
    145.         }
    146.         else if(cmd.nCmd==BM_NOTIFY)
    147.         {
    148.             try
    149.             {
    150.                 _notify par = boost::any_cast<_notify>(cmd.anyParam);
    151.                 this->on_notify(par);
    152.             }
    153.             catch(boost::bad_any_cast)
    154.             {
    155.             }
    156.         }
    157.     }
    158.     virtual void release()
    159.     {
    160.         boost::mutex::scoped_lock lock(m_mutex_command);
    161.         m_list_command.clear();
    162.     }
    163.     void safestart()
    164.     {
    165.         if(!islive())
    166.             start();
    167.         m_safe = true;
    168.         m_safestart_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
    169.         postmessage(BM_RING_START);
    170.         ::WaitForSingleObject((HANDLE)m_safestart_event,INFINITE);
    171.         CloseHandle(m_safestart_event);
    172.     }
    173.     void safestop()
    174.     {
    175.     if(this->islive())
    176.     {
    177.         m_safe = false;
    178.         m_safestop_event = (void*)CreateEvent(NULL,FALSE,FALSE,0);
    179.         {
    180.             boost::mutex::scoped_lock lock(m_mutex_command);
    181.             _command::CCmdPtr cmd(new _command);
    182.             cmd->nCmd = BM_RING_STOP;
    183.             cmd->anyParam = 0;
    184.             m_list_command.push_back(cmd);
    185.         }
    186.         DWORD dw = ::WaitForSingleObject((HANDLE)m_safestop_event,3*1000);
    187.         if(WAIT_OBJECT_0!=dw)
    188.         {
    189.         }
    190.         CloseHandle(m_safestop_event);
    191.         stop();
    192.     }
    193.     }
    194.     virtual void on_timer(const controlled_timer * p){}
    195.     virtual void on_safestart()
    196.     {
    197.         SetEvent(m_safestart_event);
    198.     }
    199.     virtual void on_safestop()
    200.     {
    201.         SetEvent(m_safestop_event);
    202.     }
    203.     virtual void on_notify(const _notify & p)
    204.     {
    205.     }
    206. protected:
    207.     virtual boost::any on_command(const unsigned int command,const boost::any par)
    208.     {
    209.         return boost::any();
    210.     }
    211.     bool getmessage()
    212.     {
    213.         std::list<_command::CCmdPtr> cache;
    214.         {
    215.             boost::mutex::scoped_lock lock(m_mutex_command);
    216.             while(!m_list_command.empty())
    217.             {
    218.                 _command::CCmdPtr p = m_list_command.front();
    219.                 m_list_command.pop_front();
    220.                 cache.push_back(p);
    221.             }
    222.         }
    223.         _command::CCmdPtr stop_command;
    224.         std::list<_command::CCmdPtr>::iterator item;
    225.         for(item = cache.begin();item!=cache.end();item++)
    226.         {
    227.             if((*(*item)).nCmd==BM_RING_STOP)
    228.             {
    229.                 stop_command = *item;               
    230.                 break;
    231.             }
    232.         }
    233.         if(stop_command.get()==0)
    234.         {
    235.             while(!cache.empty())
    236.             {
    237.                 _command::CCmdPtr p = cache.front();
    238.                 cache.pop_front();
    239.                 try
    240.                 {
    241.                     if((*p).nCmd!=BM_RING_START)
    242.                     {
    243.                         if(!this->m_safe)
    244.                             continue;
    245.                     }
    246.                     this->message(*p);
    247.                 }
    248.                 catch(boost::bad_any_cast &)
    249.                 {
    250.                 }
    251.             }
    252.             return true;
    253.         }
    254.         else
    255.         {
    256.             cache.clear();
    257.             this->message(*stop_command);
    258.             return false;
    259.         }
    260.     }
    261. private:
    262.     void*   m_safestart_event;
    263.     void* m_safestop_event;
    264.     bool m_safe;//在多线程,尤其牵涉到线程之间有类似socket级别关联时,当父线程safestop以后有可能会收到其他线程的postmessage,这时会引起线程死锁,这个m_safe就是解决这个问题的,当safestop以后不再接收新消息处理
    265.     boost::mutex m_mutex_command;
    266.     std::list<_command::CCmdPtr> m_list_command;
    267. };
    268. class controlled_timer: public controlled_module_ex
    269. {
    270. public:
    271.   controlled_timer()
    272.   {
    273.     this->m_time = 0;
    274.     this->m_parent = 0;
    275.     this->m_step = 0;
    276.   }
    277.   ~controlled_timer(){
    278.   }
    279. protected:
    280.   controlled_module_ex* m_parent;
    281.   int m_time;
    282.   int m_step;
    283. public:
    284.   void starttimer(int time,controlled_module_ex* parent)
    285.   {
    286.     this->safestart();
    287.     this->postmessage(BM_RING_SETPARENT,parent);
    288.     this->postmessage(BM_RING_SETTIME,time);
    289.   }
    290.   void stoptimer()
    291.   {
    292.     this->safestop();
    293.   }
    294. public:
    295.   virtual void on_safestop()
    296.   {
    297.     m_time = 0;
    298.     controlled_module_ex::on_safestop();
    299.   }
    300.   virtual void message(const _command & cmd)
    301.   {
    302.     controlled_module_ex::message(cmd);
    303.     if(cmd.nCmd==BM_RING_SETTIME)
    304.     {
    305.         int time = boost::any_cast<int>(cmd.anyParam);
    306.         this->m_time = time/this->m_sleeptime;
    307.         this->postmessage(BM_RING_CYCLE);
    308.     }
    309.     else if(cmd.nCmd==BM_RING_SETPARENT)
    310.     {
    311.         this->m_parent  = boost::any_cast<controlled_module_ex*>(cmd.anyParam);
    312.     }   
    313.     else if(cmd.nCmd==BM_RING_CYCLE)
    314.     {
    315.         if(m_time>0)
    316.         {
    317.             if(m_step>m_time)
    318.             {
    319.                 m_parent->postmessage(BM_TIMER,this);
    320.                 m_step=0;
    321.             }
    322.             m_step++;
    323.         }
    324.         this->postmessage(BM_RING_CYCLE);
    325.     } 
    326.   }
    327. };
    1.向线程PostMessage
      函数controlled_module_ex::postmessage完成消息推送。
      虚拟函数controlled_module_ex::message(const _command & cmd)实现消息接收。
      
    1. #include "controlled_module_ex.hpp"
    2. class thdex: public controlled_module_ex
    3. {
    4. protected:
    5.     virtual void message(const _command & cmd)
    6.     {
    7.         controlled_module_ex::message(cmd);
    8.         if(cmd.nCmd==BM_USER+1)
    9.         {
    10.             cout << "get message" << endl;
    11.         }
    12.     }
    13. };
    14. int _tmain(int argc, _TCHAR* argv[])
    15. {
    16.     thdex t;
    17.     t.safestart();
    18.     t.postmessage(BM_USER+1);
    19.     char buf[10];
    20.     gets_s(buf,sizeof buf);
    21.     t.safestop();
    22.     return 0;
    23. }
      2.向线程PostMessage,并携带简单对象参数
     我们都知道常规的PostMessage要传参,如果是整型参数,还可以用强制转换,但如果是其他类型,例如字符串,我们就必须创建一个字符串缓冲,把缓冲指针作为参数传过去,线程还不能忘记删除,否则导致内存泄漏,自定义结构也是一样的操作,如果想尝试传递一个CString对象,是不可能的。
      幸运的是boost提供了boost::any来抽象任何对象类型,controlled_module_ex的消息传递都是基于boost::any来完成,程序员可以由此写出干净而且内存安全的代码。
    1. #include "controlled_module_ex.hpp"
    2. struct mystruct
    3. {
    4.     string a;
    5.     int b;
    6. };
    7. class thdex: public controlled_module_ex
    8. {
    9. protected:
    10.     virtual void message(const _command & cmd)
    11.     {
    12.         controlled_module_ex::message(cmd);
    13.         if(cmd.nCmd==BM_USER+1)
    14.         {
    15.             cout << "get integer:" << boost::any_cast<int>(cmd.anyParam) << endl;
    16.         }
    17.         if(cmd.nCmd==BM_USER+2)
    18.         {
    19.             cout << "get string:" << boost::any_cast<string>(cmd.anyParam) << endl;
    20.         }
    21.         if(cmd.nCmd==BM_USER+3)
    22.         {
    23.             mystruct par = boost::any_cast<mystruct>(cmd.anyParam);
    24.             cout << "get mystruct:" << par.a << "," << par.b << endl;
    25.         }
    26.     }
    27. };
    28. int _tmain(int argc, _TCHAR* argv[])
    29. {
    30.     thdex t;
    31.     t.safestart();
    32.     t.postmessage(BM_USER+1,123);
    33.     t.postmessage(BM_USER+2,string("hello world"));
    34.     mystruct par;
    35.     par.a = "hello world";
    36.     par.b = 123;
    37.     t.postmessage(BM_USER+3,par);
    38.     char buf[10];
    39.     gets_s(buf,sizeof buf);
    40.     t.safestop();
    41.     return 0;
    42. }
    3.向线程PostMessage,并传递内存块参数
      假如我们书写了一个录音子线程,如何将录制的语音数据传递给其他线程呢,常规做法是创建一个缓冲,将语音数据填充进去,然后将缓冲地址作为参数传递,这种做法要求接收线程不能忘记删除,否则会导致内存泄漏。
      幸运的是boost提供了智能指针,可以类似java,c#的智能内存回收一样来管理内存分配,我们如果使用这个对象来作为参数传递,就可以完美的防范内存泄漏行为,就算子线程没有处理,别担心,内存它会自动回收的。
    1. #include "controlled_module_ex.hpp"
    2. struct mystruct
    3. {
    4.     boost::shared_ptr<char> data;
    5.     int datalen;
    6. };
    7. class thdex: public controlled_module_ex
    8. {
    9. protected:
    10.     virtual void message(const _command & cmd)
    11.     {
    12.         controlled_module_ex::message(cmd);
    13.         if(cmd.nCmd==BM_USER+1)
    14.         {
    15.             cout << "get sharedptr" << endl; //仅仅得到数据,得不到数据长度
    16.         }
    17.         if(cmd.nCmd==BM_USER+2)
    18.         {
    19.             mystruct par = boost::any_cast<mystruct>(cmd.anyParam);
    20.             cout << "get sharedptr len:" << par.datalen << endl;
    21.         }
    22.     }
    23. };
    24. int _tmain(int argc, _TCHAR* argv[])
    25. {
    26.     thdex t;
    27.     t.safestart();
    28.     t.postmessage(BM_USER+1,boost::shared_ptr<char>(new char[1000]));
    29.     mystruct par;
    30.     par.datalen = 1000;
    31.     par.data = boost::shared_ptr<char>(new char[par.datalen]);
    32.     t.postmessage(BM_USER+2,par);
    33.     char buf[10];
    34.     gets_s(buf,sizeof buf);
    35.     t.safestop();
    36.     return 0;
    37. }
    3.向线程SendMessage
      函数controlled_module_ex::execute完成这个工作
      虚拟函数controlled_module_ex::on_command(const unsigned int command,const boost::any par)响应消息处理
     
    1. #include "controlled_module_ex.hpp"
    2. class thdex: public controlled_module_ex
    3. {
    4. protected:
    5.     boost::any on_command(const unsigned int command,const boost::any par)
    6.     {
    7.         if(command==1)
    8.         {
    9.             cout << "on command" << endl;
    10.             return 0;
    11.         }
    12.         if(command==2)
    13.         {
    14.             cout << "on command,par:" << boost::any_cast<string>(par) << endl;
    15.             return 0;
    16.         }
    17.         if(command==3)
    18.         {
    19.             return true;
    20.         }
    21.         else
    22.             return controlled_module_ex::on_command(command,par);
    23.     }
    24. };
    25. int _tmain(int argc, _TCHAR* argv[])
    26. {
    27.     thdex t;
    28.     t.safestart();
    29.     t.execute(1,0);//等待子线程处理完成
    30.     t.execute(2,string("hello world"));//带参数 等待子线程完成
    31.     bool rs = boost::any_cast<bool>(t.execute(3,0));//等待子线程处理完成,并取得返回值
    32.     cout << "get thread result:" << rs << endl;
    33.     boost::any timeout = t.execute(4,0,1000);//等待子线程处理,超时1秒
    34.     if(timeout.empty())
    35.         cout << "timeout " << endl;
    36.     char buf[10];
    37.     gets_s(buf,sizeof buf);
    38.     t.safestop();
    39.     return 0;
    40. }
    4.定时器
      类似于CWnd::OnTimer,controlled_module_ex也提供一个虚拟函数virtual void on_timer(const controlled_timer * p);来处理定时
    1. #include "controlled_module_ex.hpp"
    2. class thdex: public controlled_module_ex
    3. {
    4. protected:
    5.     virtual void on_timer(const controlled_timer *p)
    6.     {
    7.         cout << "ontimer" << endl;
    8.     }
    9. };
    10. int _tmain(int argc, _TCHAR* argv[])
    11. {
    12.     thdex t;
    13.     controlled_timer timer;
    14.     t.safestart();
    15.     timer.starttimer(1000,&t);
    16.     char buf[10];
    17.     gets_s(buf,sizeof buf);
    18.     t.safestop();
    19.     return 0;
    20. }
  • 相关阅读:
    mongo批量删除js示例
    Mongo压测介绍
    线性代数学习笔记(1)--二维向量点积本质(仅供自己理解)
    墨菲定律的正确解读
    spring cloud 2020.0.1踩坑记录-bootstrap不生效等
    记一次网络质量原因导致接口调用超时的调查过程
    .NET Debugging Demos Lab 2: Crash
    .NET Debugging Demos Lab 3: Memory
    Hang caused by GC
    .NET Debugging Demos Lab 4: High CPU hang
  • 原文地址:https://www.cnblogs.com/lvdongjie/p/4447781.html
Copyright © 2011-2022 走看看