zoukankan      html  css  js  c++  java
  • C# SemaphoreSlim 实现

    当多个任务或线程并行运行时,难以避免的对某些有限的资源进行并发的访问。可以考虑使用信号量来进行这方面的控制(System.Threading.Semaphore)是表示一个Windows内核的信号量对象。如果预计等待的时间较短,可以考虑使用SemaphoreSlim,它则带来的开销更小。.NetFrameWork中的信号量通过跟踪进入和离开的任务或线程来协调对资源的访问。信号量需要知道资源的最大数量,当一个任务进入时,资源计数器会被减1,当计数器为0时,如果有任务访问资源,它会被阻塞,直到有任务离开为止。
    如果需要有跨进程或AppDomain的同步时,可以考虑使用Semaphore。Semaphore是取得的Windows 内核的信号量,所以在整个系统中是有效的。它主要的接口是Release和WaitOne,使用的方式和SemaphoreSlim是一致的
    信号量Semaphore是另外一个CLR中的内核同步对象。在.net中,类Semaphore封装了这个对象。与标准的排他锁对象(Monitor,Mutex,SpinLock)不同的是,它不是一个排他的锁对象,它与SemaphoreSlim,ReaderWriteLock等一样允许多个有限的线程同时访问共享内存资源。

    Semaphore就好像一个栅栏,有一定的容量,当里面的线程数量到达设置的最大值时候,就没有线程可以进去。然后,如果一个线程工作完成以后出来了,那下一个线程就可以进去了。Semaphore的WaitOne或Release等操作分别将自动地递减或者递增信号量的当前计数值。当线程试图对计数值已经为0的信号量执行WaitOne操作时,线程将阻塞直到计数值大于0。在构造Semaphore时,最少需要2个参数。信号量的初始容量和最大的容量。

    Semaphore的WaitOne或者Release方法的调用大约会耗费1微秒的系统时间,而优化后的SemaphoreSlim则需要大致四分之一微秒。在计算中大量频繁使用它的时候SemaphoreSlim还是优势明显,加上SemaphoreSlim还丰富了不少接口,更加方便我们进行控制,所以在4.0以后的多线程开发中,推荐使用SemaphoreSlim。SemaphoreSlim的实现如下:

    public class SemaphoreSlim : IDisposable
        {  
            private volatile int m_currentCount; //可用数的资源数,<=0开始阻塞
            private readonly int m_maxCount;
            private volatile int m_waitCount; //阻塞的线程数
            private object m_lockObj;
            private volatile ManualResetEvent m_waitHandle;
            private const int NO_MAXIMUM = Int32.MaxValue;
            //Head of list representing asynchronous waits on the semaphore.
            private TaskNode m_asyncHead;
            // Tail of list representing asynchronous waits on the semaphore.
            private TaskNode m_asyncTail;
             // A pre-completed task with Result==true
            private readonly static Task<bool> s_trueTask =
                new Task<bool>(false, true, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, default(CancellationToken));
    
            public SemaphoreSlim(int initialCount) : this(initialCount, NO_MAXIMUM){ }        
            public SemaphoreSlim(int initialCount, int maxCount)
            {
                if (initialCount < 0 || initialCount > maxCount)
                {
                    throw new ArgumentOutOfRangeException("initialCount", initialCount, GetResourceString("SemaphoreSlim_ctor_InitialCountWrong"));
                }
                if (maxCount <= 0)
                {
                    throw new ArgumentOutOfRangeException("maxCount", maxCount, GetResourceString("SemaphoreSlim_ctor_MaxCountWrong"));
                }
                m_maxCount = maxCount;
                m_lockObj = new object();
                m_currentCount = initialCount;
            }
            public void Wait(){Wait(Timeout.Infinite, new CancellationToken());}
            public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken)
            {
                CheckDispose();
                if (millisecondsTimeout < -1)
                {
                    throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong"));
                }
                cancellationToken.ThrowIfCancellationRequested();
                uint startTime = 0;
                if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout > 0)
                {
                    startTime = TimeoutHelper.GetTime();
                }
    
                bool waitSuccessful = false;
                Task<bool> asyncWaitTask = null;
                bool lockTaken = false;
    
                CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this);
                try
                {
                    SpinWait spin = new SpinWait();
                    while (m_currentCount == 0 && !spin.NextSpinWillYield)
                    {
                        spin.SpinOnce();
                    }
                    try { }
                    finally
                    {
                        Monitor.Enter(m_lockObj, ref lockTaken);
                        if (lockTaken)
                        {
                            m_waitCount++;
                        }
                    }
    
                    // If there are any async waiters, for fairness we'll get in line behind
                    if (m_asyncHead != null)
                    {
                        Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't");
                        asyncWaitTask = WaitAsync(millisecondsTimeout, cancellationToken);
                    }
                    // There are no async waiters, so we can proceed with normal synchronous waiting.
                    else
                    {
                        // If the count > 0 we are good to move on.
                        // If not, then wait if we were given allowed some wait duration
                        OperationCanceledException oce = null;
                        if (m_currentCount == 0)
                        {
                            if (millisecondsTimeout == 0)
                            {
                                return false;
                            }
                            // Prepare for the main wait...
                            // wait until the count become greater than zero or the timeout is expired
                            try
                            {
                                waitSuccessful = WaitUntilCountOrTimeout(millisecondsTimeout, startTime, cancellationToken);
                            }
                            catch (OperationCanceledException e) { oce = e; }
                        }
                     
                        Contract.Assert(!waitSuccessful || m_currentCount > 0, "If the wait was successful, there should be count available.");
                        if (m_currentCount > 0)
                        {
                            waitSuccessful = true;
                            m_currentCount--;
                        }
                        else if (oce != null)
                        {
                            throw oce;
                        }
                        if (m_waitHandle != null && m_currentCount == 0)
                        {
                            m_waitHandle.Reset();
                        }
                    }
                }
                finally
                {
                    // Release the lock
                    if (lockTaken)
                    {
                        m_waitCount--;
                        Monitor.Exit(m_lockObj);
                    }
    
                    // Unregister the cancellation callback.
                    cancellationTokenRegistration.Dispose();
                }
                return (asyncWaitTask != null) ? asyncWaitTask.GetAwaiter().GetResult() : waitSuccessful;
            }
            
            private bool WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, CancellationToken cancellationToken)
            {
                int remainingWaitMilliseconds = Timeout.Infinite;
                //Wait on the monitor as long as the count is zero
                while (m_currentCount == 0)
                {
                    // If cancelled, we throw. Trying to wait could lead to deadlock.
                    cancellationToken.ThrowIfCancellationRequested();
                    if (millisecondsTimeout != Timeout.Infinite)
                    {
                        remainingWaitMilliseconds = TimeoutHelper.UpdateTimeOut(startTime, millisecondsTimeout);
                        if (remainingWaitMilliseconds <= 0)
                        {
                            // The thread has expires its timeout
                            return false;
                        }
                    }
                    // ** the actual wait **
                    if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds))
                    {
                        return false;
                    }
                }
                return true;
            }
            public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken)
            {
                CheckDispose();
                // Validate input
                if (millisecondsTimeout < -1)
                {
                    throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong"));
                }
                // Bail early for cancellation
                if (cancellationToken.IsCancellationRequested)
                    return Task.FromCancellation<bool>(cancellationToken);
    
                lock (m_lockObj)
                {
                    // If there are counts available, allow this waiter to succeed.
                    if (m_currentCount > 0)
                    {
                        --m_currentCount;
                        if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();
                        return s_trueTask;
                    }
                        // If there aren't, create and return a task to the caller.
                        // The task will be completed either when they've successfully acquired
                        // the semaphore or when the timeout expired or cancellation was requested.
                    else
                    {
                        Contract.Assert(m_currentCount == 0, "m_currentCount should never be negative");
                        var asyncWaiter = CreateAndAddAsyncWaiter();
                        return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?
                            asyncWaiter :
                            WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);
                    }
                }
            }
    
            /// <summary>Creates a new task and stores it into the async waiters list.</summary>
            /// <returns>The created task.</returns>
            private TaskNode CreateAndAddAsyncWaiter()
            {
                Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
                // Create the task
                var task = new TaskNode();
                // Add it to the linked list
                if (m_asyncHead == null)
                {
                    Contract.Assert(m_asyncTail == null, "If head is null, so too should be tail");
                    m_asyncHead = task;
                    m_asyncTail = task;
                }
                else
                {
                    Contract.Assert(m_asyncTail != null, "If head is not null, neither should be tail");
                    m_asyncTail.Next = task;
                    task.Prev = m_asyncTail;
                    m_asyncTail = task;
                }
                // Hand it back
                return task;
            }
            
            private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken)
            {
                Contract.Assert(asyncWaiter != null, "Waiter should have been constructed");
                Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
                using (var cts = cancellationToken.CanBeCanceled ?
                    CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :
                    new CancellationTokenSource())
                {
                    var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));
                    if (asyncWaiter == await waitCompleted.ConfigureAwait(false))
                    {
                        cts.Cancel(); // ensure that the Task.Delay task is cleaned up
                        return true; // successfully acquired
                    }
                }
    
                // If we get here, the wait has timed out or been canceled.
    
                // If the await completed synchronously, we still hold the lock.  If it didn't,
                // we no longer hold the lock.  As such, acquire it.
                lock (m_lockObj)
                {
                    // Remove the task from the list.  If we're successful in doing so,
                    // we know that no one else has tried to complete this waiter yet,
                    // so we can safely cancel or timeout.
                    if (RemoveAsyncWaiter(asyncWaiter))
                    {
                        cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred
                        return false; // timeout occurred
                    }
                }
    
                // The waiter had already been removed, which means it's already completed or is about to
                // complete, so let it, and don't return until it does.
                return await asyncWaiter.ConfigureAwait(false) await asyncWaiter.ConfigureAwait(false);
            }
            public int Release(){ return Release(1);}
    
            public int Release(int releaseCount)
            {
                CheckDispose();
    
                // Validate input
                if (releaseCount < 1)
                {
                    throw new ArgumentOutOfRangeException( "releaseCount", releaseCount, GetResourceString("SemaphoreSlim_Release_CountWrong"));
                }
                int returnCount;
    
                lock (m_lockObj)
                {
                    // Read the m_currentCount into a local variable to avoid unnecessary volatile accesses inside the lock.
                    int currentCount = m_currentCount;
                    returnCount = currentCount;
    
                    // If the release count would result exceeding the maximum count, throw SemaphoreFullException.
                    if (m_maxCount - currentCount < releaseCount)
                    {
                        throw new SemaphoreFullException();
                    }
    
                    // Increment the count by the actual release count
                    currentCount += releaseCount;
    
                    // Signal to any synchronous waiters
                    int waitCount = m_waitCount;
                    if (currentCount == 1 || waitCount == 1)
                    {
                        Monitor.Pulse(m_lockObj);
                    }
                    else if (waitCount > 1)
                    {
                        Monitor.PulseAll(m_lockObj);
                    }
    
                    // Now signal to any asynchronous waiters, if there are any.  While we've already
                    // signaled the synchronous waiters, we still hold the lock, and thus
                    // they won't have had an opportunity to acquire this yet.  So, when releasing
                    // asynchronous waiters, we assume that all synchronous waiters will eventually
                    // acquire the semaphore.  That could be a faulty assumption if those synchronous
                    // waits are canceled, but the wait code path will handle that.
                    if (m_asyncHead != null)
                    {
                        Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't null");
                        int maxAsyncToRelease = currentCount - waitCount;
                        while (maxAsyncToRelease > 0 && m_asyncHead != null)
                        {
                            --currentCount;
                            --maxAsyncToRelease;
    
                            // Get the next async waiter to release and queue it to be completed
                            var waiterTask = m_asyncHead;
                            RemoveAsyncWaiter(waiterTask); // ensures waiterTask.Next/Prev are null
                            QueueWaiterTask(waiterTask);
                        }
                    }
                    m_currentCount = currentCount;
    
                    // Exposing wait handle if it is not null
                    if (m_waitHandle != null && returnCount == 0 && currentCount > 0)
                    {
                        m_waitHandle.Set();
                    }
                }
    
                // And return the count
                return returnCount;
            }
            
            ///Removes the waiter task from the linked list.</summary>
            private bool RemoveAsyncWaiter(TaskNode task)
            {
                Contract.Requires(task != null, "Expected non-null task");
                Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
    
                // Is the task in the list?  To be in the list, either it's the head or it has a predecessor that's in the list.
                bool wasInList = m_asyncHead == task || task.Prev != null;
    
                // Remove it from the linked list
                if (task.Next != null) task.Next.Prev = task.Prev;
                if (task.Prev != null) task.Prev.Next = task.Next;
                if (m_asyncHead == task) m_asyncHead = task.Next;
                if (m_asyncTail == task) m_asyncTail = task.Prev;
                Contract.Assert((m_asyncHead == null) == (m_asyncTail == null), "Head is null iff tail is null");
    
                // Make sure not to leak
                task.Next = task.Prev = null;
    
                // Return whether the task was in the list
                return wasInList;
            }
            private static void QueueWaiterTask(TaskNode waiterTask)
            {
                ThreadPool.UnsafeQueueCustomWorkItem(waiterTask, forceGlobal: false);
            }
            public int CurrentCount
            {
                get { return m_currentCount; }
            }
            public WaitHandle AvailableWaitHandle
            {
                get
                {
                    CheckDispose();
                    if (m_waitHandle != null)
                        return m_waitHandle;
                    lock (m_lockObj)
                    {
                        if (m_waitHandle == null)
                        {
                            m_waitHandle = new ManualResetEvent(m_currentCount != 0);
                        }
                    }
                    return m_waitHandle;
                }
            }
            private sealed class TaskNode : Task<bool>, IThreadPoolWorkItem
            {
                internal TaskNode Prev, Next;
                internal TaskNode() : base() {}
    
                [SecurityCritical]
                void IThreadPoolWorkItem.ExecuteWorkItem()
                {
                    bool setSuccessfully = TrySetResult(true);
                    Contract.Assert(setSuccessfully, "Should have been able to complete task");
                }
    
                [SecurityCritical]
                void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae) { /* nop */ }
            }
        }

    SemaphoreSlim类有几个私有字段很重要,m_currentCount表示可用资源,如果m_currentCount>0每次调用Wait都会减1,当m_currentCount<=0时再次调用Wait方法就会阻塞。每次调用Release方法m_currentCount都会加1.m_maxCount表示最大可用资源数,是在构造函数中指定的。m_waitCount表示当前阻塞的线程数。TaskNode m_asyncHead,m_asyncTail这2个变量主要用于异步方法

    我们首先来看看Wait方法,这里还有它的异步版本WaitAsync。在Wait方法中首先检查m_currentCount是否为0,如果是我们用SpinWait自旋10次;任意一次Wait都需要锁住m_lockObj对象,m_asyncHead != null表示当前已经存在异步的对象,所以我们调用WaitAsync方法,如果没有那么我们调用WaitUntilCountOrTimeout方法,该方法在m_currentCount==0会阻塞到到m_currentCount不为0或者超时;看到WaitUntilCountOrTimeout方法中【if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds))】,就很明了Wait方法中【CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this)】存在的原因了,确实很巧妙【这里和ManualResetEventSlim相似】。现在我们回到WaitAsync方法,该方法也是首先检查m_currentCount是否大于0,大于直接返回。否者调用CreateAndAddAsyncWaiter创建一个Task<bool>【Task<bool>是一个链表结构】,如果没有取消且超时大于-1,那么就调用WaitUntilCountOrTimeoutAsync方法,该方法首先包装一个Task【var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token))】然后等待线程【await waitCompleted.ConfigureAwait(false)】返回的是asyncWaiter或者另一个Delay的Task。如果返回的不是asyncWaiter说明已经超时需要调用RemoveAsyncWaiter,然后返回 await asyncWaiter.ConfigureAwait(false),如果返回的是asyncWaiter,那么就调用Cancel方法。那么这里的asyncWaiter.ConfigureAwait(false)什么时候退出了【或者说不阻塞】,这就要看Release中的QueueWaiterTask方法了。

    QueueWaiterTask方法或调用TaskNode的ExecuteWorkItem方法。
    那现在我们来看看Release方法,该方法会把currentCount加1,然后把等待线程转为就绪线程【Monitor.Pulse(m_lockObj)或 Monitor.PulseAll(m_lockObj)】,如果存在异步的话,看看还可以释放几个异步task【 int maxAsyncToRelease = currentCount - waitCount】,这里Release的注释很重要,只是没怎么明白,现释同步的waiters,然后在释放异步的waiters,但是释放同步后锁的资源没有释放,在释放异步的waiters时候是把currentCount减1,这样感觉异步waiters优先获取资源。也不知道我的理解是否正确?
    1)当ConfigureAwait(true),代码由同步执行进入异步执行时,当前同步执行的线程上下文信息(比如HttpConext.Current,Thread.CurrentThread.CurrentCulture)就会被捕获并保存至SynchronizationContext中,供异步执行中使用,并且供异步执行完成之后(await之后的代码)的同步执行中使用(虽然await之后是同步执行的,但是发生了线程切换,会在另外一个线程中执行「ASP.NET场景」)。这个捕获当然是有代价的,当时我们误以为性能问题是这个地方的开销引起,但实际上这个开销很小,在我们的应用场景不至于会带来性能问题。

    2)当Configurewait(flase),则不进行线程上下文信息的捕获,async方法中与await之后的代码执行时就无法获取await之前的线程的上下文信息,在ASP.NET中最直接的影响就是HttpConext.Current的值为null。

  • 相关阅读:
    Mybatis 是否支持延迟加载?如果支持,它的实现原理是什么?
    MyBatis 实现一对多有几种方式,怎么操作的?
    利用 ps 怎么显示所有的进程? 怎么利用 ps 查看指定进 程的信息?
    哪个命令专门用来查看后台任务?
    什么是 MyBatis 的接口绑定?有哪些实现方式?
    什么是端到端微服务测试?
    我们如何在测试中消除非决定论?
    什么是持续监测?
    怎么使一个命令在后台运行?
    博客园样式美化(兼容为知笔记)
  • 原文地址:https://www.cnblogs.com/majiang/p/7894539.html
Copyright © 2011-2022 走看看