zoukankan      html  css  js  c++  java
  • 练习:自己动手实现一个轻量级的信号量(一)

    信号量历史悠久,折磨死了一代又一代的计算机专业学生,但是不得不承认其在Multi-thread环境下的巨大作用。最经典的案例莫过于管理一个环状缓冲区。.NET 中的Semaphore对象等同于Win32中的Semaphore。属于内核级对象,因此使用它的代价就比较大了。并且Semaphore对象每次仅仅能够等待一个Count,这有的时候让事情变得有些烦,例如你可能不得不将环状缓冲区分割为一个个的Chunk(实际上这是一个好方法,因为我们应该对于Cache进行优化)。Qt中的信号量可以一次获得多个Count,感觉很方便。综上,我们希望自己动手实现一个轻量级的,支持一次获得多个资源的信号量。


    首先看看信号量重要维护哪些信息。第一,当前还有多少剩余的资源(_currentResources);第二,系统初始化的时候有多少资源(_initResources);第三,当前有多少个线程被阻塞(_waitingThreadCount):

    1 public class SemaphoreExtended
    2 {
    3   private int _currentResources;
    4   private int _initResources;
    5   private volatile int _waitingThreadCount;
    6 }

    我们知道,如果信号量持有的资源数目没完没了的减少,那么降到0就会阻塞,但是没完没了的增加呢?厄~你说会溢出,所以,我们还需要进行一个上限的检查。于是我们添加了一个上限值,这个上限值应该由用户指定,并且一旦指定就不能更改了。于是我们的代码中添加了一个新的Field。

    1 public class SemaphoreExtended
    2 {
    3   private int _currentResources;
    4   private int _initResources;
    5   private volatile int _waitingThreadCount;
    6   // 资源数目的上限
    7   private readonly int _maxResourceCount;
    8 }

    好的,你发现了,我们的需要管理的Fields不止一个,那么~~对了,为了保证某些操作的一致性,我们肯定需要一把锁。

     

    1 public class SemaphoreExtended
    2 {
    3   private int _currentResources;
    4   private int _initResources;
    5   private volatile int _waitingThreadCount;
    6   private readonly int _maxResourceCount;
    7   // 信号量对象的锁
    8   private object _globalLock;
    9 }

    好了,下面我们可以开始我们的工作了。下一步我们干什么呢?厄~咱们解决掉关键问题之后剩下的工作肯定轻松愉快,因此,我们先考虑Wait和Release吧。

    首先解决Release的问题,想想Release要做哪些事情。

    (1)看看当前的资源数目加上释放的数目会不会超过上限

    (2)如果不超过上限就增加我们的资源数目

    (3)如果发现有的线程在等待资源则激活那些线程,让他们看看现有的资源是不是够他们使用了

    (4)将之前的资源数目返回给用户。还有关键的一点,这些操作必须保证是Atomic的,于是这些操作都需要lock!

    写代码了~~~

     1 public int Release(int releaseResCount)
     2 {
     3   // 参数需要检查一下
     4   if (releaseResCount < 1)
     5   throw new ArgumentOutOfRangeException("releaseCount");
     6   // 这个函数需要保证其结果的一致性
     7   lock (this._globalLock)
     8   {
     9     // 已经超过了上限了
    10     if (this._maxResourceCount - this._currentResources < releaseResCount)
    11     {
    12       throw new System.Threading.SemaphoreFullException();
    13     }
    14     // 很好,现在我们在做第(2)步
    15     this._currentResources += releaseResCount;
    16     // 我们现在可以做第(3)步了
    17     // 为了性能考虑,我们应当分两种情况进行讨论。
    18     // 当然了,如果没有线程等待就不管了~
    19     // 什么?万一你在判断的时候等待的线程增加了怎么办?有锁呢,不可能~
    20     if (this._waitingThreadCount == 1)
    21     {
    22       System.Threading.Monitor.Pulse(this._globalLock);
    23     }
    24     else if (this._waitingThreadCount > 1)
    25     {
    26       System.Threading.Monitor.PulseAll(this._globalLock);
    27     }
    28     // 这就是第(4)步
    29     return (this._currentResources - releaseResCount);
    30   }
    31 }

    以上的代码正确么?答案是错误的!错误在了第(3)步,最终导致第4步结果可能是错误的。

    问题出在了Pulse上。如果你发现看MSDN上的解释有些头晕,那么我们就用伪代码解释一下。

    Monitor.Pulse(...)

    等价于

    if(_globalLock.Owner == Thread.CurrentThread)
    {
      Monitor.Exit(_globalLock);
    }
    else
    {
      throw new ...;
    }
    InternalPulse(...);
    Monitor.Enter(_globalLock);

    问题已经出来了,实际上我们唤醒线程时释放了锁,唤醒之后只有重新获得锁之后才能继续,而这个时候信号量拥有的资源数目很可能已经改变了!我们说,Release返回给用户的数据是调用Release之前信号量所保有的资源数目,那么,我们就应当提前保存这个值。修改一下:

     1 public int Release(int releaseResCount)
     2 {
     3     // 参数需要检查一下
     4     if (releaseResCount < 1)
     5         throw new ArgumentOutOfRangeException("releaseCount");
     6     // 这个函数需要保证其结果的一致性
     7     lock (this._globalLock)
     8     {
     9         // 已经超过了上限了
    10         if (this._maxResourceCount - this._currentResources < releaseResCount)
    11         {
    12             throw new System.Threading.SemaphoreFullException();
    13         }
    14         // 很好,现在我们在做第(2)步
    15         int old = this._currentResources;
    16         this._currentResources += releaseResCount;
    17         // 我们现在可以做第(3)步了
    18         // 为了性能考虑,我们应当分两种情况进行讨论。
    19         // 当然了,如果没有线程等待就不管了~
    20         // 什么?万一你在判断的时候等待的线程增加了怎么办?有锁呢,不可能~
    21         if (this._waitingThreadCount == 1)
    22         {
    23             System.Threading.Monitor.Pulse(this._globalLock);
    24         }
    25         else if (this._waitingThreadCount > 1)
    26         {
    27             System.Threading.Monitor.PulseAll(this._globalLock);
    28         }
    29         // 这就是第(4)步
    30         return old;
    31     }
    32 }


    好,目前我们轻松的解决了Release问题。但是Wait就不是那么好办的了。想这个花掉很多脑细胞。首先一个问题,Wait可能阻塞,所以必须支持超时操作。其次就是必须考虑线程唤醒之后的工作。

    我们还是先理一下思路。Wait试图减少资源数目。如果失败就需要不停的等待,直到他被别人唤醒。首先用文字书写一下:

    (1)试图获得锁!这是必须的因为没有锁的话我们根本没有办法保证操作的一致性。这个时候我们不能够使用lock{...}因为我们必须处理超时的问题。
    (2)好样的,锁在我们手里了,现在我们要试图减少资源数目:
     while(当前的资源数还不够我们减少的)
     {
      // 也就是说我们必须等待了
      if (已经超时了)
      {
       返回;
      }
      释放掉锁并让线程处于等待状态。如果超时就返回;
      // 好了,执行到这一句说明我们已经持有锁了,有人把我们唤醒了
      // 但是唤醒了不代表我们就有我们需要的资源了所以循环回去检查
     }
     // 能执行到这一句说明:(a)我们拥有锁,(b)我们有足够的资源
     减少资源数目;
    (3)释放锁。

    我们来写代码!

     1 public bool Wait(int millisecondsTimeout, int waitResNumber)
     2 {
     3     // 参数检查工作是必须的
     4     if (millisecondsTimeout < -1)
     5         throw new ArgumentOutOfRangeException("millisecondsTimeout");
     6     if (waitResNumber < 1)
     7         throw new ArgumentOutOfRangeException("waitResNumber");
     8     // 我们在后面要两次处理超时,因此我们需要一个计时器
     9     System.Diagnostics.StopWatch watch = null;
    10     if (millisecondsTimeout != -1)
    11     {
    12         watch = System.Diagnostics.Stopwatch.StartNew();
    13     }
    14     // 现在我们来做第(1)步
    15     if (!System.Threading.Monitor.TryEnter(this._globalLock))
    16     {
    17         if (millisecondsTimeout == 0)
    18             return false;
    19         if (!System.Threading.Monitor.TryEnter(this._globalLock, millisecondsTimeout))
    20         {
    21             return false;
    22         }
    23     }
    24     // 现在我们已经获得锁了,我们要增加阻塞的线程数目,以便将来有人能够唤醒我们
    25     ++this._waitingThreadCount;
    26     try
    27     {
    28         // 如果当前的资源数目不够用的,我们就得等等了
    29         while (this._currentResources - waitResNumber < 0)
    30         {
    31             if (millisecondsTimeout != -1)
    32             {
    33                 // 看看是不是超时了
    34                 timeoutNum = UpdateTimeout(watch, millisecondsTimeout);
    35                 if (timeoutNum <= 0)
    36                 {
    37                     return false;
    38                 }
    39             }
    40             // 如果没有超时我们就再等一下
    41             if (!System.Threading.Monitor.Wait(this._globalLock, timeoutNum))
    42             {
    43                 return false;
    44             }
    45             // 好的我们被唤醒了,赶快回去检查检查有没有足够的资源了
    46         }
    47         // 很好,我们现在有足够的资源了,
    48         // 并且没有什么人能够再更改资源数目了,因为锁在我们手里!
    49         this._currentResources -= waitResNumber;
    50     }
    51     finally
    52     {
    53         // 很好,我们安全的退出了,减少阻塞的线程数目并且释放锁
    54         --this._waitingThreadCount;
    55         System.Threading.Monitor.Exit(this._globalLock);
    56     }
    57     return true;
    58 }

    好的!我们基本上实现了一个信号量的精华部分了。但是这是不够的。尤其是在Wait的过程中,我们直接就使用了Monitor,实际上,对于多核心处理器来说,一开始进行短暂的spin是更好的选择!我们的代码还有优化的空间!下一回,我们就在Wait中添加Spin优化,并最终实现一个完整的信号量。

  • 相关阅读:
    8_python连接数据库
    7_数据类型
    Memcached delete 命令
    Memcached gets 命令
    Memcached get 命令
    Memcached CAS 命令
    Memcached prepend 命令
    Memcached append 命令
    Memcached replace 命令
    Memcached add 命令
  • 原文地址:https://www.cnblogs.com/lxconan/p/1255980.html
Copyright © 2011-2022 走看看