zoukankan      html  css  js  c++  java
  • C++实现管程与同步队列

    一、管程

    1.1 简介

      管程可以视为一个线程安全的数据结构,其内部提供了互斥与同步操作,向外提供访问共享数据的专用接口(接口被称为管程的过程),通过管程提供的接口即可达成共享数据的保护与线程间同步。

      使用管程,可以简化线程间互斥、同步的编码复杂度(否则需自己控制互斥、同步机制,并保证正确),可以集中分散的互斥、同步操作代码,更容易验证、查错,也更可读(反之,信号量的PV操作可能分散到各个地方,验证、阅读相对麻烦)。

    1.2 管程的特点

    • 共享数据仅能被管程的过程访问
    • 线程通过调用管程的过程进入管程
    • 任何时候仅能有一个线程在管程中执行,其他阻塞直到可用
    • 管程内共享数据不可用时
      • 需要共享数据的线程将阻塞并释放管程
      • 其他线程可进入管程构造数据可用条件,并通知阻塞线程
    • 管程内共享数据可用时, 被阻塞线程将在合适时间重新进入管程

    1.3 管程分类

    管程中仅能有一个线程在其中执行,根据发起通知时,被唤醒线程(T1)执行,还是唤醒线程(T2)继续执行,可将管程分为三种:

    • Mesa管程(Lampson/Redell):T2继续执行直到退出,T1进入入口队列,后面与其他线程公平竞争(由于不是立刻执行,所以当T1执行时,条件可能已经不满足,因此需循环检测条件是否仍旧达成)。
    • Hoare管程:T2立即阻塞并释放管程,T1马上执行,退出后T2恢复执行(多两次线程切换)。
    • Brinch Hanson管程:T2的通知操作仅允许在退出时发送通知,通知发送后T2结束,T1开始执行。

    1.4 管程的实现

    根据前述管程特点,管程应该是一个对象,内部封装了资源,该对象实现了互斥及与阻塞、唤醒机制
    使用锁,搭配条件变量来实现互斥及与阻塞、唤醒机制
    下面简单介绍条件变量,后面的管程代码有一个小优化,涉及对条件变量实现的理解

    1.4.1 条件变量

      条件变量是同步原语,其内部有一个队列,用于存放被wait阻塞的线程,当另一个线程发起通知时,如果队列不为空,队头线程将被唤醒,否则,什么也不做。条件变量也可以一次性唤醒队列中的全部阻塞线程

    1.4.1.1 条件变量的一种实现

       条件变量里有一个存储阻塞线程的队列。由于阻塞线程和唤醒通知线程都需要访问这同一队列,所以还有一个用于保护队列的,锁粒度不大,并且不需要线程切换,应为自旋锁。

       wait函数会将当前线程入队,并原子的进行锁释放与当前线程阻塞,阻塞直到另一线程通知才解除,解除后重新获取锁。锁释放与当前线程阻塞必须是原子的,否则,别的线程发出的唤醒通知,可能发生在当前线程阻塞之前,这会造成唤醒丢失。

    signal函数出队一个阻塞线程并唤醒他,当队列为空时,不做任何事情。

     broadcast函数将逐个唤醒队列中的所有阻塞线程。

    1.4.2 管程实现代码

     下面实现了一个管程,同时符合mesa与hanson管程的定义,并且做了一些优化,取消掉了不必要的通知操作。

     1 #ifndef __MONITOR_H_
     2 #define __MONITOR_H_
     3 
     4 #include <list>
     5 #include <mutex>
     6 #include <utility>
     7 #include <condition_variable>
     8 
     9 template<typename T>
    10 class Monitor{
    11 public:
    12     Monitor(): m_iMaxCount(100), m_bStop(false){
    13     }
    14     Monitor(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
    15     }
    16     ~Monitor() = default;
    17 
    18     void Enqueue(const T& data){
    19         Append(data);
    20     }
    21 
    22     void Enqueue(T&& data){
    23         Append(std::forward<T>(data));    //转发data的原属性,此处转发data的右值引用
    24     }
    25 
    26     void Dequeue(T& data){
    27         std::unique_lock<std::mutex> lk(m_mMutex);
    28         m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
    29         if(m_bStop){
    30             return;
    31         }
    32         bool bNeedNotify = IsFull();
    33         data = m_listData.front();
    34         m_listData.pop_front();
    35         lk.unlock();
    36         if(bNeedNotify){
    37             m_cvNotFull.notify_one();
    38         }
    39     }
    40 
    41     void Stop(){
    42         {
    43             std::lock_guard<std::mutex> lk(m_mMutex);
    44             m_bStop = true;
    45         }
    46         m_cvNotEmpty.notify_all();
    47         m_cvNotFull.notify_all();
    48     }
    49 
    50 private:
    51     template<typename U>
    52     void Append(U&& data){                                //实现通用引用
    53         std::unique_lock<std::mutex> lk(m_mMutex);
    54         m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
    55         if(m_bStop){
    56             return;
    57         }
    58         bool bNeedNotify = IsEmpty();
    59         m_listData.emplace_back(std::forward<U>(data));    //再次转发
    60         lk.unlock();
    61         if(bNeedNotify){
    62             m_cvNotEmpty.notify_one();
    63         }
    64     }
    65 
    66     bool IsFull(){
    67         return static_cast<int>(m_listData.size()) == m_iMaxCount;
    68     }
    69 
    70     bool IsEmpty(){
    71         return 0 == static_cast<int>(m_listData.size());
    72     }
    73 
    74     Monitor(const Monitor& rhs) = delete;
    75     Monitor(Monitor&& rhs) = delete;
    76     Monitor& operator=(const Monitor& rhs) = delete;
    77     Monitor& operator=(Monitor&& rhs) = delete;
    78 
    79 private:
    80     int m_iMaxCount;
    81     bool m_bStop;
    82     std::mutex m_mMutex;
    83     std::list<T> m_listData;
    84     std::condition_variable m_cvNotEmpty;
    85     std::condition_variable m_cvNotFull;
    86 };
    87 
    88 #endif //!__MONITOR_H_
    View Code
    • 代码中条件变量使用了带可调用对象作为参数的wait,它的实现是这样的
      1 template<typename _Predicate>
      2   void
      3   wait(unique_lock<mutex>& __lock, _Predicate __p)
      4   {
      5     while (!__p())
      6         wait(__lock);
      7   }

    可调用对象返回true则什么都不做,否则,重复wait到满足条件,这可以避免虚假唤醒,同时也是Mesa管程的行为

    • 进行队列不满、不空通知前,检查队列原状态 ,通过状态判断是否进行notify
      • 在由满到不满,空到不空这两种状态变化下才进行通知
      • 根据上面的条件变量的signal函数实现可知,通知时会使用到,虽然锁粒度很小,并且是自旋锁,但是也存在较大的性能消耗,通过一个简单的判断,避免了通知,也就避免了锁
    • 模板方法Append实现了通用引用,使得Append同时支持引用左值与右值,配合std::forward可将参数原属性(左值引用或右值引用)转发给另一个函数当参数
      • 实现通用引用必须含有&&,并且需要发生类型推导,所以Append必须是不同的模板参数U,而不能和类模板Monitor一样为T(T的类型在模板实例化时固定,将不能发生类型推导)
      • 一切变量都是左值(变量本身分配有内存空间,可以对其取地址),所以在调用实现了通用引用函数时,应该std::forward一下,否则通用引用绑定的是变量本身(左值),即得到左值引用
      • std::forward靠引用折叠实现对原属性的转发:含左值引用折叠后为左值引用,否则为右值引用
     1 template<typename _Tp>
     2     constexpr _Tp&&
     3     forward(typename std::remove_reference<_Tp>::type& __t) noexcept
     4     { return static_cast<_Tp&&>(__t); }
     5 
     6 template<typename _Tp>
     7     constexpr _Tp&&
     8     forward(typename std::remove_reference<_Tp>::type&& __t) noexcept
     9     {
    10     static_assert(!std::is_lvalue_reference<_Tp>::value, "template argument"
    11     " substituting _Tp is an lvalue reference type");
    12     return static_cast<_Tp&&>(__t);
    13     }
    • 参数_Tp若为type &(左值引用)

     则static_cast<_Tp&&>为static_cast<type& &&>,折叠后为static_cast<type&>, 所以转发返回值仍为为左值引用。

    • 参数_Tp若为type &&(右值引用)

    static_cast<_Tp&&>为static_cast<type&& &&>,折叠后为static_cast<type&&>, 所以转发返回仍为右值引用 。

    • Enqueue最好不要通用引用方式实现,除非调用者明确知道使用std::forward(不用的代价为丢失右值语义)

     二、同步队列

    2.1 简介

    管程其实也是一种同步队列,现在

    • 在管程的基础上添加Try类操作,Try类操作操作时,若条件无法满足,则立刻返回false,不阻塞
    • 添加获取元素个数的方法

    得到同步队列如下

    2.2 代码

      1 #ifndef __SyncQueue_H_
      2 #define __SyncQueue_H_
      3 
      4 #include <list>
      5 #include <mutex>
      6 #include <utility>
      7 #include <condition_variable>
      8 
      9 template<typename T>
     10 class SyncQueue{
     11 public:
     12     SyncQueue() : m_iMaxCount(100), m_bStop(false){
     13     }
     14     SyncQueue(int iMaxCount) : m_iMaxCount(iMaxCount), m_bStop(false){
     15     }
     16     ~SyncQueue() = default;
     17 
     18     void Enqueue(const T& data){
     19         Append(data);
     20     }
     21 
     22     void Enqueue(T&& data){
     23         Append(std::forward<T>(data));    //转发data的原属性,此处转发data的右值引用
     24     }
     25 
     26     bool TryEnqueue(const T& data){
     27         return TryAppend(data);
     28     }
     29 
     30     bool TryEnqueue(T&& data){
     31         return TryAppend(std::forward<T>(data));
     32     }
     33 
     34     void Dequeue(T& data){
     35         std::unique_lock<std::mutex> lk(m_mMutex);
     36         m_cvNotEmpty.wait(lk, [this](){return m_bStop || !IsEmpty(); });
     37         if(m_bStop){
     38             return;
     39         }
     40         bool bNeedNotify = IsFull();
     41         data = m_listData.front();
     42         m_listData.pop_front();
     43         lk.unlock();
     44         if(bNeedNotify){
     45             m_cvNotFull.notify_one();
     46         }
     47     }
     48 
     49     bool TryDequeue(T& data){
     50         std::unique_lock<std::mutex> lk(m_mMutex);
     51         if(m_bStop || IsEmpty()){
     52             return false;
     53         }
     54         bool bNeedNotify = IsFull();
     55         data = m_listData.front();
     56         m_listData.pop_front();
     57         lk.unlock();
     58         if(bNeedNotify){
     59             m_cvNotFull.notify_one();
     60         }
     61         return true;
     62     }
     63 
     64     void Stop(){
     65         {
     66             std::lock_guard<std::mutex> lk(m_mMutex);
     67             m_bStop = true;
     68         }
     69         m_cvNotEmpty.notify_all();
     70         m_cvNotFull.notify_all();
     71     }
     72 
     73     int Size(){
     74         std::lock_guard<std::mutex> lk(m_mMutex);
     75         return static_cast<int>(m_listData.size());
     76     }
     77 
     78 private:
     79     template<typename U>
     80     bool TryAppend(U&& data){                                //实现通用引用
     81         std::unique_lock<std::mutex> lk(m_mMutex);
     82         if(m_bStop || IsFull()){
     83             return false;
     84         }
     85         bool bNeedNotify = IsEmpty();
     86         m_listData.emplace_back(std::forward<U>(data));    //再次转发
     87         lk.unlock();
     88         if(bNeedNotify){
     89             m_cvNotEmpty.notify_one();
     90         }
     91         return true;
     92     }
     93 
     94     template<typename U>
     95     void Append(U&& data){                                //实现通用引用
     96         std::unique_lock<std::mutex> lk(m_mMutex);
     97         m_cvNotFull.wait(lk, [this](){return m_bStop || !IsFull(); });
     98         if(m_bStop){
     99             return;
    100         }
    101         bool bNeedNotify = IsEmpty();
    102         m_listData.emplace_back(std::forward<U>(data));    //再次转发
    103         lk.unlock();
    104         if(bNeedNotify){
    105             m_cvNotEmpty.notify_one();
    106         }
    107     }
    108 
    109     bool IsFull(){
    110         return static_cast<int>(m_listData.size()) == m_iMaxCount;
    111     }
    112 
    113     bool IsEmpty(){
    114         return 0 == static_cast<int>(m_listData.size());
    115     }
    116 
    117     SyncQueue(const SyncQueue& rhs) = delete;
    118     SyncQueue(SyncQueue&& rhs) = delete;
    119     SyncQueue& operator=(const SyncQueue& rhs) = delete;
    120     SyncQueue& operator=(SyncQueue&& rhs) = delete;
    121 
    122 private:
    123     int m_iMaxCount;
    124     bool m_bStop;
    125     std::mutex m_mMutex;
    126     std::list<T> m_listData;
    127     std::condition_variable m_cvNotEmpty;
    128     std::condition_variable m_cvNotFull;
    129 };
    130 
    131 #endif //!__SyncQueue_H_
    View Code

    三、转载于

    https://www.cnblogs.com/Keeping-Fit/p/15064039.html#wiz-toc-6-203609834

    本文来自博客园,作者:Mr-xxx,转载请注明原文链接:https://www.cnblogs.com/MrLiuZF/p/15142816.html

  • 相关阅读:
    WPF Attached event
    WPF Progressbar
    IDisposable
    CommandTarget属性
    观察者模式
    DesignerSerializationVisibility, Browsable,Category Attribute
    CVS使用手册
    Javascript原型的简单理解
    由插件独特的处理器产生页面
    教训
  • 原文地址:https://www.cnblogs.com/MrLiuZF/p/15142816.html
Copyright © 2011-2022 走看看