zoukankan      html  css  js  c++  java
  • 多线程中的锁系统(四)-谈谈自旋锁

    阅读目录:

    1. 基础
    2. 自旋锁示例
    3. SpinLock
    4. 继续SpinLock
    5. 总结

    基础

    内核锁:基于内核对象构造的锁机制,就是通常说的内核构造模式。用户模式构造和内核模式构造

               优点:cpu利用最大化。它发现资源被锁住,请求就排队等候。线程切换到别处干活,直到接受到可用信号,线程再切回来继续处理请求。

               缺点:托管代码->用户模式代码->内核代码损耗、线程上下文切换损耗。

                       在锁的时间比较短时,系统频繁忙于休眠、切换,是个很大的性能损耗。

    自旋锁:原子操作+自循环。通常说的用户构造模式。  线程不休眠,一直循环尝试对资源访问,直到可用。

               优点:完美解决内核锁的缺点。

               缺点:长时间一直循环会导致cpu的白白浪费,高并发竞争下、CPU的消耗特别严重。

    混合锁:内核锁+自旋锁。 混合锁是先自旋锁一段时间或自旋多少次,再转成内核锁。

               优点:内核锁和自旋锁的折中方案,利用前二者优点,避免出现极端情况(自旋时间过长,内核锁时间过短)。

               缺点: 自旋多少时间、自旋多少次,这些策略很难把控。 

               在操作系统及net框架层,这块算法策略做的已经非常优了,有些API函数也提供了时间及次数可配置项,让使用者根据需求自行判断。

    自旋锁示例

    来看下我们自己简单实现的自旋锁:

            int signal = 0;
                var li = new List<int>();
                Parallel.For(0, 1000 * 10000, r =>
                {
                    while (Interlocked.Exchange(ref signal, 1) != 0)//加自旋锁
                    {
                        //黑魔法
                    }
                    li.Add(r);
                    Interlocked.Exchange(ref signal, 0);  //释放锁
                });
                Console.WriteLine(li.Count);
                //输出:10000000

    上面就是自旋锁:Interlocked.Exchange+while

    1:定义signal  0可用,1不可用。

    2:Parallel模拟并发竞争,原子更改signal状态。 后续线程自旋访问signal,是否可用。

    3:A线程使用完后,更改signal为0。 剩余线程竞争访问资源,B线程胜利后,更改signal为1,失败线程继续自旋,直到可用。

    SpinLock

    SpinLock是net4.0后Net提供的自旋锁类库,内部做了优化。

    简单看下实例:

      var li = new List<int>();
                var sl = new SpinLock();
                Parallel.For(0, 1000 * 10000, r =>
                {
                    bool gotLock = false;     //释放成功
                    sl.Enter(ref gotLock);    //进入锁
                    li.Add(r);
                    if (gotLock) sl.Exit();  //释放
                });
                Console.WriteLine(li.Count);
                //输出:10000000

     继续SpinLock

    new SpinLock(false)   这个构造函数主要用来检查死锁用,true是开启。

    在开启状态下,一旦发生死锁会直接抛异常的。

    SpinLock实现的部分源码:

      public void Enter(ref bool lockTaken) 
            {
                if (lockTaken) 
                { 
                    lockTaken = false;
                    throw new System.ArgumentException(Environment.GetResourceString("SpinLock_TryReliableEnter_ArgumentException")); 
                }
    
                // Fast path to acquire the lock if the lock is released
                // If the thread tracking enabled set the new owner to the current thread id 
                // Id not, set the anonymous bit lock
                int observedOwner = m_owner; 
                int newOwner = 0; 
                bool threadTrackingEnabled = (m_owner & LOCK_ID_DISABLE_MASK) == 0;
                if (threadTrackingEnabled) 
                {
                    if (observedOwner == LOCK_UNOWNED)
                        newOwner = Thread.CurrentThread.ManagedThreadId;
                } 
                else if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
                { 
                    newOwner = observedOwner | LOCK_ANONYMOUS_OWNED; // set the lock bit 
                }
                if (newOwner != 0) 
                {
    #if !FEATURE_CORECLR
                    Thread.BeginCriticalRegion();
    #endif 
    
    #if PFX_LEGACY_3_5 
                    if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner) 
                    {
                        lockTaken = true; 
                        return;
                    }
    #else
                    if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner) 
                    {
                        // Fast path succeeded 
                        return; 
                    }
    #endif 
    #if !FEATURE_CORECLR
                    Thread.EndCriticalRegion();
    #endif
                } 
                //Fast path failed, try slow path
                ContinueTryEnter(Timeout.Infinite, ref lockTaken); 
            } 
    private void ContinueTryEnter(int millisecondsTimeout, ref bool lockTaken)
            { 
                long startTicks = 0; 
                if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout != 0)
                { 
                    startTicks = DateTime.UtcNow.Ticks;
                }
    
    #if !FEATURE_PAL && !FEATURE_CORECLR   // PAL doesn't support  eventing, and we don't compile CDS providers for Coreclr 
                if (CdsSyncEtwBCLProvider.Log.IsEnabled())
                { 
                    CdsSyncEtwBCLProvider.Log.SpinLock_FastPathFailed(m_owner); 
                }
    #endif 
    
                if (IsThreadOwnerTrackingEnabled)
                {
                    // Slow path for enabled thread tracking mode 
                    ContinueTryEnterWithThreadTracking(millisecondsTimeout, startTicks, ref lockTaken);
                    return; 
                } 
    
                // then thread tracking is disabled 
                // In this case there are three ways to acquire the lock
                // 1- the first way the thread either tries to get the lock if it's free or updates the waiters, if the turn >= the processors count then go to 3 else go to 2
                // 2- In this step the waiter threads spins and tries to acquire the lock, the number of spin iterations and spin count is dependent on the thread turn
                // the late the thread arrives the more it spins and less frequent it check the lock avilability 
                // Also the spins count is increaes each iteration
                // If the spins iterations finished and failed to acquire the lock, go to step 3 
                // 3- This is the yielding step, there are two ways of yielding Thread.Yield and Sleep(1) 
                // If the timeout is expired in after step 1, we need to decrement the waiters count before returning
     
                int observedOwner;
    
                //***Step 1, take the lock or update the waiters
     
                // try to acquire the lock directly if possoble or update the waiters count
                SpinWait spinner = new SpinWait(); 
                while (true) 
                {
                    observedOwner = m_owner; 
                    if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
                    {
    #if !FEATURE_CORECLR
                        Thread.BeginCriticalRegion(); 
    #endif
     
    #if PFX_LEGACY_3_5 
                        if (Interlocked.CompareExchange(ref m_owner, observedOwner | 1, observedOwner) == observedOwner)
                        { 
                            lockTaken = true;
                            return;
                        }
    #else 
                        if (Interlocked.CompareExchange(ref m_owner, observedOwner | 1, observedOwner, ref lockTaken) == observedOwner)
                        { 
                            return; 
                        }
    #endif 
    
    #if !FEATURE_CORECLR
                        Thread.EndCriticalRegion();
    #endif 
                    }
                    else //failed to acquire the lock,then try to update the waiters. If the waiters count reached the maximum, jsut break the loop to avoid overflow 
                        if ((observedOwner & WAITERS_MASK) ==  MAXIMUM_WAITERS || Interlocked.CompareExchange(ref m_owner, observedOwner + 2, observedOwner) == observedOwner) 
                            break;
     
                    spinner.SpinOnce();
                }
    
                // Check the timeout. 
                if (millisecondsTimeout == 0 ||
                    (millisecondsTimeout != Timeout.Infinite && 
                    TimeoutExpired(startTicks, millisecondsTimeout))) 
                {
                    DecrementWaiters(); 
                    return;
                }
    
                //***Step 2. Spinning 
                //lock acquired failed and waiters updated
                int turn = ((observedOwner + 2) & WAITERS_MASK) / 2; 
                int processorCount = PlatformHelper.ProcessorCount; 
                if (turn < processorCount)
                { 
                    int processFactor = 1;
                    for (int i = 1; i <= turn * SPINNING_FACTOR; i++)
                    {
                        Thread.SpinWait((turn + i) * SPINNING_FACTOR * processFactor); 
                        if (processFactor < processorCount)
                            processFactor++; 
                        observedOwner = m_owner; 
                        if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED)
                        { 
    #if !FEATURE_CORECLR
                            Thread.BeginCriticalRegion();
    #endif
     
                            int newOwner = (observedOwner & WAITERS_MASK) == 0 ? // Gets the number of waiters, if zero
                                observedOwner | 1 // don't decrement it. just set the lock bit, it is zzero because a previous call of Exit(false) ehich corrupted the waiters 
                                : (observedOwner - 2) | 1; // otherwise decrement the waiters and set the lock bit 
                            Contract.Assert((newOwner & WAITERS_MASK) >= 0);
    #if PFX_LEGACY_3_5 
                            if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner)
                            {
                                lockTaken = true;
                                return; 
                            }
    #else 
                            if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner) 
                            {
                                return; 
                            }
    #endif
    
    #if !FEATURE_CORECLR 
                            Thread.EndCriticalRegion();
    #endif 
                        } 
                    }
                } 
    
                // Check the timeout.
                if (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout))
                { 
                    DecrementWaiters();
                    return; 
                } 
    
                //*** Step 3, Yielding 
                //Sleep(1) every 50 yields
                int yieldsoFar = 0;
                while (true)
                { 
                    observedOwner = m_owner;
                    if ((observedOwner & LOCK_ANONYMOUS_OWNED) == LOCK_UNOWNED) 
                    { 
    #if !FEATURE_CORECLR
                        Thread.BeginCriticalRegion(); 
    #endif
                        int newOwner = (observedOwner & WAITERS_MASK) == 0 ? // Gets the number of waiters, if zero
                               observedOwner | 1 // don't decrement it. just set the lock bit, it is zzero because a previous call of Exit(false) ehich corrupted the waiters
                               : (observedOwner - 2) | 1; // otherwise decrement the waiters and set the lock bit 
                        Contract.Assert((newOwner & WAITERS_MASK) >= 0);
    #if PFX_LEGACY_3_5 
                        if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner) == observedOwner) 
                        {
                            lockTaken = true; 
                            return;
                        }
    #else
                        if (Interlocked.CompareExchange(ref m_owner, newOwner, observedOwner, ref lockTaken) == observedOwner) 
                        {
                            return; 
                        } 
    #endif
     
    #if !FEATURE_CORECLR
                        Thread.EndCriticalRegion();
    #endif
                    } 
    
                    if (yieldsoFar % SLEEP_ONE_FREQUENCY == 0) 
                    { 
                        Thread.Sleep(1);
                    } 
                    else if (yieldsoFar % SLEEP_ZERO_FREQUENCY == 0)
                    {
                        Thread.Sleep(0);
                    } 
                    else
                    { 
    #if PFX_LEGACY_3_5 
                        Platform.Yield();
    #else 
                        Thread.Yield();
    #endif
                    }
     
                    if (yieldsoFar % TIMEOUT_CHECK_FREQUENCY == 0)
                    { 
                        //Check the timeout. 
                        if (millisecondsTimeout != Timeout.Infinite && TimeoutExpired(startTicks, millisecondsTimeout))
                        { 
                            DecrementWaiters();
                            return;
                        }
                    } 
    
                    yieldsoFar++; 
                } 
            }
     
            /// <summary>
            /// decrements the waiters, in case of the timeout is expired
            /// </summary>
            private void DecrementWaiters() 
            {
                SpinWait spinner = new SpinWait(); 
                while (true) 
                {
                    int observedOwner = m_owner; 
                    if ((observedOwner & WAITERS_MASK) == 0) return; // don't decrement the waiters if it's corrupted by previous call of Exit(false)
                    if (Interlocked.CompareExchange(ref m_owner, observedOwner - 2, observedOwner) == observedOwner)
                    {
                        Contract.Assert(!IsThreadOwnerTrackingEnabled); // Make sure the waiters never be negative which will cause the thread tracking bit to be flipped 
                        break;
                    } 
                    spinner.SpinOnce(); 
                }
     
            }
    View Code

    从代码中发现SpinLock并不是简单的实现那样一直自旋,其内部做了很多优化。  

    1:内部使用了Interlocked.CompareExchange保持原子操作, m_owner 0可用,1不可用。

    2:第一次获得锁失败后,继续调用ContinueTryEnter,ContinueTryEnter有三种获得锁的情况。 

    3:ContinueTryEnter函数第一种获得锁的方式,使用了while+SpinWait。

    4:第一种方式达到最大等待者数量后,命中走第二种。 继续自旋 turn * 100次。100这个值是处理器核数(4, 8 ,16)下最好的。

    5:第二种如果还不能获得锁,走第三种。这种就带有混合构造的意思了,如下:

        if (yieldsoFar % 40 == 0) 
                        Thread.Sleep(1);
                    else if (yieldsoFar % 10 == 0)
                        Thread.Sleep(0);
                    else
                        Thread.Yield();

     Thread.Sleep(1) : 终止当前线程,放弃剩下时间片 休眠1毫秒, 退出跟其他线程抢占cpu。当然这个一般会更多,系统无法保证这么细的时间粒度。

     Thread.Sleep(0):  终止当前线程,放弃剩下时间片。  但立马还会跟其他线程抢cpu,能不能抢到跟线程优先级有关。

     Thread.Yeild():       结束当前线程,让出CPU给其他准备好的线程。其他线程ok后或没有还没有准备好,继续执行当前,Thread.Yeild()会返回个bool值,表示CPU是否让出成功。

    从源码中可以学到不少编程技巧,比如可以借鉴自旋+Thread.Yeild() 或 while+Thread.Yeild()等组合使用方式。

     总结

    本章介绍了自旋锁的基础及楼主的经验。 关于SpinLock类源码这块,只简单理解了下并没有深究。

    测试了下SpinLock和自己实现的自旋锁性能对比(并行添加1000w List<int>()),SpinLock是单纯的自旋锁性能2倍以上。

    另外测试了lock的性能,是系统SpinLock性能的3倍以上,可见lock内部自旋的效率更高,CLR暂没开源,看不到CLR具体实现的代码。

    参考http://www.projky.com/dotnet/4.0/System/Threading/SpinLock.cs.html

  • 相关阅读:
    LeetCode 226. Invert Binary Tree
    LeetCode 221. Maximal Square
    LeetCode 217. Contains Duplicate
    LeetCode 206. Reverse Linked List
    LeetCode 213. House Robber II
    LeetCode 198. House Robber
    LeetCode 188. Best Time to Buy and Sell Stock IV (stock problem)
    LeetCode 171. Excel Sheet Column Number
    LeetCode 169. Majority Element
    运维工程师常见面试题
  • 原文地址:https://www.cnblogs.com/mushroom/p/4245529.html
Copyright © 2011-2022 走看看