zoukankan      html  css  js  c++  java
  • (译)构建Async同步基元,Part 5 AsyncSemaphore

    传送门:异步编程系列目录……

    最近在学习.NET4.5关于“并行任务”的使用。“并行任务”有自己的同步机制,没有显示给出类似如旧版本的:事件等待句柄、信号量、lockReaderWriterLock……等同步基元对象,但我们可以沿溪这一编程习惯,那么这系列翻译就是给“并行任务”封装同步基元对象。翻译资源来源《(译)关于AsyncAwaitFAQ

    1.         构建Async同步基元,Part 1 AsyncManualResetEvent

    2.         构建Async同步基元,Part 2 AsyncAutoResetEvent

    3.         构建Async同步基元,Part 3 AsyncCountdownEvent

    4.         构建Async同步基元,Part 4 AsyncBarrier

    5.         构建Async同步基元,Part 5 AsyncSemaphore

    6.         构建Async同步基元,Part 6 AsyncLock

    7.         构建Async同步基元,Part 7 AsyncReaderWriterLock

     

    源码:构建Async同步基元.rar

    开始:构建Async同步基元,Part 5 AsyncSemaphore

             在之前的文章中,我已经构建了AsyncManualResetEventAsyncAutoResetEventAsyncCountdownEventAsyncBarrier。在这篇文章中,我将构建一个AsyncSemaphore类。

             Semaphore(信号量)被广泛使用。一个重要应用就是限流,为了保护访问限制性资源。在.NET中我们有两个信号量类型即:Semaphore(封装Win32函数)和SemaphoreSlim(封装Monitor实现的轻量版)。现在我将构建一个简单的Async版本,待构建的目标类型如下:

    public class AsyncSemaphore
    { 
        public AsyncSemaphore(int initialCount); 
        public Task WaitAsync(); 
        public void Release(); 
    }

    我们需要一些成员变量,这些成员变量几乎和AsyncAutoResetEvent成员一样,变量存在的作用也类似。我们需要能够唤醒单个等待者,所以我们维护一个TaskCompletionSource<bool>的实例队列。我们需要跟踪信号量的当前计数,以便我们知道在开始阻塞之前还有多少等待者可以完成。并且我们会发现,有一个高效的措施,即我们维护一个已经完成的Task以供重用。

    private readonly static Task s_completed = Task.FromResult(true); 
    private readonly Queue<TaskCompletionSource<bool>>m_waiters
                              = new Queue<TaskCompletionSource<bool>>(); 
    private int m_currentCount;

    类的构造函数仅仅是初始化请求计数:

    public AsyncSemaphore(int initialCount) 
    { 
        if (initialCount< 0) 
            throw new ArgumentOutOfRangeException("initialCount"); 
        m_currentCount = initialCount; 
    }

             现在让我们来构建WaitAsync()方法。此方法中我们要使用lock确保所有操作以原子方式执行,并且要确保与Release()方法保持同步。然后,这里有两种情况。如果当前m_currentCount大于0,即还没有到达semaphore限流界限,所以这个等待操作能立即完成。在这种情况下,我们递减当前计数并且返回缓存的已经完成的任务s_completed(即在无争用的情况下我们不需要创建新的TaskCompletionSource<bool>实例)。如果当前计数m_currentCount0,我们创建一个新的TaskCompletionSource<bool>实例添加到队列中,并且返回实例对应的Task给调用者。

            public Task WaitAsync()
            {
                lock (m_waiters)
                {
                    if (m_currentCount > 0)
                    {
                        --m_currentCount;
                        return s_completed;
                    }
                    else
                    {
                        var waiter = new TaskCompletionSource<bool>();
                        m_waiters.Enqueue(waiter);
                        return waiter.Task;
                    }
                }
            }

             Release()方法中,如果队列中还有等待者,则从队列中删除一个等待者并且完成对应的TaskCompletionSource<bool>实例。如果没有等待者,则简单的递增当前计数m_currentCount。这里的操作需要保持原子性,并且要与WaitAsync()方法保持同步,所以Release()的主体代码需要再一次对m_waiters队列加锁。这里要注意一个重要的事情,在之前的文章中,我讨论了TaskCompletionSource<TResult> [Try]Set*() 系列方法,会使TaskCompletionSource<TResult>对应的Task作为同步调用的一部分运行。如果我们在lock内部调用SetResilt(),则Task的同步延续的运行将长时间持有lock。因此,我们释放lock后再调用Task[Try]Set*() 系列方法来完成任务。

            public void Release()
            {
                TaskCompletionSource<bool> toRelease = null;
                lock (m_waiters)
                {
                    if (m_waiters.Count > 0)
                        toRelease = m_waiters.Dequeue();
                    else
                        ++m_currentCount;
                }
                if (toRelease != null)
                    toRelease.SetResult(true);
            }

     

    这就是本节要讲的AsyncSemaphore

    完整源码如下:

        public class AsyncSemaphore
        {
            // 维护一个已经完成的Task以供重用提高效率
            private readonly static Task s_completed = Task.FromResult(true);
            private readonly Queue<TaskCompletionSource<bool>> m_waiters
                                        = new Queue<TaskCompletionSource<bool>>();
            // 跟踪信号量的当前计数,以便我们知道在开始阻塞之前还有多少等待者可以完成
            private int m_currentCount;
    
            public AsyncSemaphore(int initialCount)
            {
                if (initialCount < 0) 
                    throw new ArgumentOutOfRangeException("initialCount");
                m_currentCount = initialCount;
            }
    
            public Task WaitAsync()
            {
                lock (m_waiters)
                {
                    if (m_currentCount > 0)
                    {
                        --m_currentCount;
                        return s_completed;
                    }
                    else
                    {
                        var waiter = new TaskCompletionSource<bool>();
                        m_waiters.Enqueue(waiter);
                        return waiter.Task;
                    }
                }
            }
    
            public void Release()
            {
                TaskCompletionSource<bool> toRelease = null;
                lock (m_waiters)
                {
                    if (m_waiters.Count > 0)
                        toRelease = m_waiters.Dequeue();
                    else
                        ++m_currentCount;
                }
                if (toRelease != null)
                    toRelease.SetResult(true);
            }
        }

    下一节,我们将看看如何使用一个AsyncSemaphore来实现一个作用域互斥锁定机制。

                                                                                                                    

    推荐阅读:

                       异步编程:同步基元对象(上)

                       异步编程:同步基元对象(下)

     

    感谢你的观看……

    原文:Building Async Coordination Primitives, Part 5: AsyncSemaphore

    作者:Stephen Toub – MSFT

     

  • 相关阅读:
    洛谷 4035 [JSOI2008]球形空间产生器
    洛谷 2216 [HAOI2007]理想的正方形
    洛谷2704 [NOI2001]炮兵阵地
    洛谷2783 有机化学之神偶尔会做作弊
    洛谷 2233 [HNOI2002]公交车路线
    洛谷2300 合并神犇
    洛谷 1641 [SCOI2010]生成字符串
    Vue history模式支持ie9
    VUE实现登录然后跳转到原来的页面
    vue history模式 apache配置
  • 原文地址:https://www.cnblogs.com/heyuquan/p/2862191.html
Copyright © 2011-2022 走看看