zoukankan      html  css  js  c++  java
  • 分析Boost对 互斥量和条件变量的封装及实现生产者消费者问题

    Boost的互斥量,条件变量做了很好的封装,因此比“原生的”POSIX mutex,condition variables好用。然后我们会通过分析boost相关源码看一下boost linux是如何对pthread_mutex_t和pthread_cond_t进行的封装。

    首先看一下condition_variable_any的具体实现,代码路径:/boost/thread/pthread/condition_variable.hpp

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. class condition_variable_any  
    2. {  
    3.     pthread_mutex_t internal_mutex;  
    4.     pthread_cond_t cond;  
    5.   
    6.     condition_variable_any(condition_variable_any&);  
    7.     condition_variable_any& operator=(condition_variable_any&);  
    8.   
    9. public:  
    10.     condition_variable_any()  
    11.     {  
    12.         int const res=pthread_mutex_init(&internal_mutex,NULL);  
    13.         if(res)  
    14.         {  
    15.             boost::throw_exception(thread_resource_error());  
    16.         }  
    17.         int const res2=pthread_cond_init(&cond,NULL);  
    18.         if(res2)  
    19.         {  
    20.             BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));  
    21.             boost::throw_exception(thread_resource_error());  
    22.         }  
    23.     }  
    24.     ~condition_variable_any()  
    25.     {  
    26.         BOOST_VERIFY(!pthread_mutex_destroy(&internal_mutex));  
    27.         BOOST_VERIFY(!pthread_cond_destroy(&cond));  
    28.     }  
    29.    

    condition_variable_any的构造函数是对于内部使用的mutex和cond的初始化,对应的,析构函数则是这些资源的回收。

     

    BOOST_VERIFY的实现:

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. #undef BOOST_VERIFY    
    2. #if defined(BOOST_DISABLE_ASSERTS) || ( !defined(BOOST_ENABLE_ASSERT_HANDLER) && defined(NDEBUG) )    
    3. // 在任何情况下,expr一定会被求值。    
    4. #define BOOST_VERIFY(expr) ((void)(expr))    
    5. #else    
    6. #define BOOST_VERIFY(expr) BOOST_ASSERT(expr)    
    7. #endif    

    因此不同于assert在Release版的被优化掉不同,我们可以放心的使用BOOST_VERITY,因此它的表达式肯定会被求值,而不用担心assert的side effect。
    接下来看一下condition_variable_any的核心实现:wait

     

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. template<typename lock_type>  
    2.  void wait(lock_type& m)  
    3.  {  
    4.      int res=0;  
    5.      {  
    6.          thread_cv_detail::lock_on_exit<lock_type> guard;  
    7.          detail::interruption_checker check_for_interruption(&internal_mutex,&cond);  
    8.          guard.activate(m);  
    9.          res=pthread_cond_wait(&cond,&internal_mutex);  
    10.          this_thread::interruption_point();  
    11.      }  
    12.      if(res)  
    13.      {  
    14.          boost::throw_exception(condition_error());  
    15.      }  
    16.  }  

    首先看一下lock_on_exit:

     

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. namespace thread_cv_detail  
    2. {  
    3.     template<typename MutexType>  
    4.     struct lock_on_exit  
    5.     {  
    6.         MutexType* m;  
    7.   
    8.         lock_on_exit():  
    9.             m(0)  
    10.         {}  
    11.   
    12.         void activate(MutexType& m_)  
    13.         {  
    14.             m_.unlock();  
    15.             m=&m_;  
    16.         }  
    17.         ~lock_on_exit()  
    18.         {  
    19.             if(m)  
    20.             {  
    21.                 m->lock();  
    22.             }  
    23.        }  
    24.     };  
    25. }  

    代码很简单,实现了在调用activate时将传入的lock解锁,在该变量生命期结束时将guard的lock加锁。

     

    接下来的detail::interruption_checker check_for_interruption(&internal_mutex,&cond);是什么意思呢?From /boost/thread/pthread/thread_data.hpp

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. class interruption_checker  
    2. {  
    3.     thread_data_base* const thread_info;  
    4.     pthread_mutex_t* m;  
    5.     bool set;  
    6.   
    7.     void check_for_interruption()  
    8.     {  
    9.         if(thread_info->interrupt_requested)  
    10.         {  
    11.             thread_info->interrupt_requested=false;  
    12.             throw thread_interrupted();  
    13.         }  
    14.     }  
    15.   
    16.     void operator=(interruption_checker&);  
    17. public:  
    18.     explicit interruption_checker(pthread_mutex_t* cond_mutex,pthread_cond_t* cond):  
    19.         thread_info(detail::get_current_thread_data()),m(cond_mutex),  
    20.         set(thread_info && thread_info->interrupt_enabled)  
    21.     {  
    22.         if(set)  
    23.         {  
    24.             lock_guard<mutex> guard(thread_info->data_mutex);  
    25.             check_for_interruption();  
    26.             thread_info->cond_mutex=cond_mutex;  
    27.             thread_info->current_cond=cond;  
    28.             BOOST_VERIFY(!pthread_mutex_lock(m));  
    29.         }  
    30.         else  
    31.         {  
    32.             BOOST_VERIFY(!pthread_mutex_lock(m));  
    33.         }  
    34.     }  
    35.     ~interruption_checker()  
    36.     {  
    37.         if(set)  
    38.         {  
    39.             BOOST_VERIFY(!pthread_mutex_unlock(m));  
    40.             lock_guard<mutex> guard(thread_info->data_mutex);  
    41.             thread_info->cond_mutex=NULL;  
    42.             thread_info->current_cond=NULL;  
    43.         }  
    44.         else  
    45.         {  
    46.             BOOST_VERIFY(!pthread_mutex_unlock(m));  
    47.         }  
    48.     }  

    代码面前,毫无隐藏。那句话就是此时如果有interrupt,那么就interrupt吧。否则,lock传入的mutex,也是为了res=pthread_cond_wait(&cond,&internal_mutex);做准备。

     

    关于线程的中断点,可以移步《【Boost】boost库中thread多线程详解5——谈谈线程中断》。

    对于boost::mutex,大家可以使用同样的方法去解读boost的实现,相对于condition variable,mutex的实现更加直观。代码路径:/boost/thread/pthread/mutex.hpp。

    气味

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. namespace boost  
    2. {  
    3.     class mutex  
    4.     {  
    5.     private:  
    6.         mutex(mutex const&);  
    7.         mutex& operator=(mutex const&);  
    8.         pthread_mutex_t m;  
    9.     public:  
    10.         mutex()  
    11.         {  
    12.             int const res=pthread_mutex_init(&m,NULL);  
    13.             if(res)  
    14.             {  
    15.                 boost::throw_exception(thread_resource_error());  
    16.             }  
    17.         }  
    18.         ~mutex()  
    19.         {  
    20.             BOOST_VERIFY(!pthread_mutex_destroy(&m));  
    21.         }  
    22.   
    23.         void lock()  
    24.         {  
    25.             int const res=pthread_mutex_lock(&m);  
    26.             if(res)  
    27.             {  
    28.                 boost::throw_exception(lock_error(res));  
    29.             }  
    30.         }  
    31.   
    32.         void unlock()  
    33.         {  
    34.             BOOST_VERIFY(!pthread_mutex_unlock(&m));  
    35.         }  
    36.   
    37.         bool try_lock()  
    38.         {  
    39.             int const res=pthread_mutex_trylock(&m);  
    40.             if(res && (res!=EBUSY))  
    41.             {  
    42.                 boost::throw_exception(lock_error(res));  
    43.             }  
    44.   
    45.             return !res;  
    46.         }  
    47.   
    48.         typedef pthread_mutex_t* native_handle_type;  
    49.         native_handle_type native_handle()  
    50.         {  
    51.             return &m;  
    52.         }  
    53.   
    54.         typedef unique_lock<mutex> scoped_lock;  
    55.         typedef detail::try_lock_wrapper<mutex> scoped_try_lock;  
    56.     };   
    57. }  

     

    boost对于pthread_mutex_t和pthread_cond_t的封装,方便了开发者的使用的资源的安全有效管理。当然,在不同的公司,可能也都有类似的封装,学习boost的源码,无疑可以加深我们的理解。在某些特定的场合,我们也可以学习boost的封装方法,简化我们的日常开发。

    最后,奉上简单的生产者、消费者的boost的实现,和前文《并发编程实战: POSIX 使用互斥量和条件变量实现生产者/消费者问题》相比,我们可以看到boost简化了mutex和condition variable的使用。以下代码引自《Boost程序库完全开发指南》:

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
    1. #include <boost/thread.hpp>  
    2. #include <stack>  
    3. using std::stack;  
    4. using std::cout;  
    5. class buffer  
    6. {  
    7. private:  
    8.     boost::mutex mu; // 条件变量需要配合互斥量  
    9.     boost::condition_variable_any cond_put; // 生产者写入  
    10.     boost::condition_variable_any cond_get; // 消费者读走  
    11.   
    12.     stack<int> stk;  
    13.     int un_read;  
    14.     int capacity;  
    15.   
    16.     bool is_full()  
    17.     {  
    18.         return un_read == capacity;  
    19.     }  
    20.     bool is_empty()  
    21.     {  
    22.         return 0 == un_read;  
    23.     }  
    24.   
    25. public:  
    26.     buffer(size_t capacity) : un_read(0), capacity(capacity)  
    27.     {}  
    28.     void put(int x)  
    29.     {  
    30.   
    31.         boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类  
    32.   
    33.         while (is_full())  
    34.         {  
    35.             cout << "full waiting..." << endl;  
    36.             cond_put.wait(mu); // line:51  
    37.         }  
    38.         stk.push(x);  
    39.         ++un_read;  
    40.   
    41.         cond_get.notify_one();  
    42.     }  
    43.     void get(int *x)  
    44.     {  
    45.         boost::mutex::scoped_lock lock(mu); // 这里是读锁的门闩类  
    46.   
    47.         while (is_empty())  
    48.         {  
    49.             cout << "empty waiting..." << endl;  
    50.             cond_get.wait(mu);  
    51.         }  
    52.         *x = stk.top();  
    53.         stk.pop();  
    54.         --un_read;  
    55.   
    56.         cond_put.notify_one(); // 通知 51line可以写入了  
    57.     }  
    58. };  
    59.   
    60. buffer buf(5);   
    61.   
    62. void producer(int n)  
    63. {  
    64.     for (int i = 0; i < n; ++i)  
    65.     {  
    66.         cout << "put : " << i << endl;  
    67.         buf.put(i);  
    68.     }  
    69. }  
    70.   
    71. void consumer(int n)  
    72. {  
    73.     int x;  
    74.     for (int i = 0; i < n; ++i)  
    75.     {  
    76.         buf.get(&x);  
    77.         cout << "get : " << x << endl;  
    78.     }  
    79. }  
    80.   
    81. int main()  
    82. {  
    83.     boost::thread t1(producer, 20);  
    84.     boost::thread t2(consumer, 10);  
    85.     boost::thread t3(consumer, 10);  
    86.   
    87.     t1.join();  
    88.     t2.join();  
    89.     t3.join();  
    90.   
    91.     return 0;  
    92. }  

    最后说一句,condition_variable_any == condition, from /boost/thread/condition.hpp

     

     

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. namespace boost  
    2. {  
    3.     typedef condition_variable_any condition;  
    4. }  

  • 相关阅读:
    微软新一代Surface,该怎么看?
    Windows 8创新之路——样章分享
    微软新一代Surface发布,参数曝光
    从MS Word到Windows Live Writer
    《计算机科学基础》学习笔记_Part 1 Computer and Data
    我看Windows 8.1
    Hyper-V初涉_早期Windows安装虚拟硬件驱动
    2020.09.05【省选组】模拟 总结
    2020.08.15【NOIP提高组】模拟 总结
    2020.08.14【省选B组】模拟 总结
  • 原文地址:https://www.cnblogs.com/ghostll/p/3546077.html
Copyright © 2011-2022 走看看