zoukankan      html  css  js  c++  java
  • C#异步编程(五)异步的同步构造

    异步的同步构造

      任何使用了内核模式的线程同步构造,我都不是特别喜欢。因为所有这些基元都会阻塞一个线程的运行。创建线程的代价很大。创建了不用,这于情于理说不通。

      创建了reader-writer锁的情况,如果写锁被长时间占有,那么其他的读请求线程都会被阻塞,随着越来越多客户端请求到达,服务器创建了更多的线程,而他们被创建出来的目的就是让他们在锁上停止运行。更糟糕的是,一旦writer锁释放,所有读线程都同时解除阻塞并开始执行。现在,又变成大量的线程试图在相对数量很少的cpu上运行。所以,windows开始在线程之间不同的进行上下文切换,而真正的工作时间却很少。

           锁很流行,但长时间拥有会带来巨大的伸缩性问题。如果代码能通过异步的同步构造指出他想要一个锁,那么会非常有用。在这种情况下,如果线程得不到锁,可直接返回并执行其他工作,而不必在那里傻傻地阻塞。

           SemaphoreSlim通过waitAsync实现了这个思路

    public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken);

    使用await asynclock.WaitAsync()就可以实现刚才说的情境。

           但如果是reader-writer呢?.net framework提供了concurrentExclusiveSchedulerPair类。实例代码如下:

    private static void ConcurrentExclusiveSchedulerDemo()
    {
        var cesp = new ConcurrentExclusiveSchedulerPair();
        var tfExclusive = new TaskFactory(cesp.ExclusiveScheduler);
        var tfConcurrent = new TaskFactory(cesp.ConcurrentScheduler);
    
        for (int i = 0; i < 5; i++)
        {
            var exclusive = i < 2;
            (exclusive ? tfExclusive : tfConcurrent).StartNew(() =>
            {
                Console.WriteLine("{0} access",exclusive?"exclusive":"concurrent");
                //这里进行独占写入或者并发读取操作
            });
        }
    }

           遗憾的是,framework没有提供鞠咏reader-writer语义的异步锁。所以我们可以自己构建一个,如下:

    public sealed class AsyncOneManyLock
    {
        #region 锁的代码
        //自旋锁不要用readonly
        private SpinLock m_lock = new SpinLock(true);
    
        private void Lock()
        {
            bool taken = false;m_lock.Enter(ref taken);
        }
        private void Unlock()
        {
            m_lock.Exit();
        }
    
        #endregion
    
        #region 锁的状态和辅助方法
    
        private Int32 m_state = 0;
        private bool IsFree { get { return m_state == 0; } }
        private bool IsOwnedByWriter { get { return m_state == -1; } }
        private bool IsOwnedByReader { get { return m_state > 0; } }
        private Int32 AddReaders(Int32 count) { return m_state += count; }
        private Int32 SubtractReader() { return --m_state; }
        private void MakeWriter() { m_state = -1; }
        private void MakeFree() { m_state = 0; }
    
        #endregion
    
        //目的实在非竞态条件时增强性能和减少内存消耗
        private readonly Task m_noContentionAccessGranter;
        //每个等待的writer都通过他们在这里排队的TaskCompletionSource来唤醒
        private readonly Queue<TaskCompletionSource<Object>> m_qWaitingWriters = new Queue<TaskCompletionSource<object>>();
        //一个TaskCompletionSource收到信号,所有等待的reader都唤醒
        private TaskCompletionSource<Object> m_waitingReaderSignal = new TaskCompletionSource<object>();
        private Int32 m_numWaitingReaders = 0;
        public AsyncOneManyLock()
        {
            //创建一个返回null的任务
            m_noContentionAccessGranter = Task.FromResult<Object>(null);
        }
        public Task WaitAsync(OneManyMode mode)
        {
            Task accressGranter = m_noContentionAccessGranter;//假定无竞争
            Lock () ;
            switch (mode)
            {
                case OneManyMode.Exclusive:
                    if (IsFree)
                    {
                        MakeWriter();//无竞争
                    }
                    else
                    {
                        //有竞争
                        var tcs = new TaskCompletionSource<Object>();
                        m_qWaitingWriters.Enqueue(tcs);
                        accressGranter = tcs.Task;
                    }
                    break;
                case OneManyMode.Shared:
                    if (IsFree||(IsOwnedByReader&&m_qWaitingWriters.Count==0))
                    {
                        AddReaders(1);//无竞争
                    }
                    else
                    {
                        //有竞争,递增等待的reader数量,并返回reader任务使reader等待。
                        m_numWaitingReaders++;
                        accressGranter = m_waitingReaderSignal.Task.ContinueWith(t => t.Result);
                    }
                    break;
            }
            Unlock();
            return accressGranter;
        }
    
        public void Release()
        {
            //嘉定没有代码被释放
            TaskCompletionSource<Object> accessGranter = null;
            Lock () ;
            if (IsOwnedByWriter)
            {
                MakeFree();
            }
            else
            {
                SubtractReader();
            }
            if (IsFree)
            {
                //如果自由,唤醒一个等待的writer或所有等待的readers
                if (m_qWaitingWriters.Count>0)
                {
                    MakeWriter();
                    accessGranter = m_qWaitingWriters.Dequeue();
                }
                else if (m_numWaitingReaders>0)
                {
                    AddReaders(m_numWaitingReaders);
                    m_numWaitingReaders = 0;
                    accessGranter = m_waitingReaderSignal;
                    //为将来需要等待的readers创建一个新的tcs
                    m_waitingReaderSignal = new TaskCompletionSource<object>();
                }
            }
            Unlock();
            //唤醒锁外面的writer/reader,减少竞争几率以提高性能
            if (accessGranter!=null)
            {
                accessGranter.SetResult(null);
            }
        }
    }
    AsyncOneManyLock

           上述代码永远不会阻塞线程。原因是内部没有没有很实用任何内核构造。这里确实使用了一个SpinLock,它在内部使用了用户模式构造。但是他的执行时间很短,WaitAsync方法里,只是一些整数计算和比较,这也符合只有执行时间很短的代码段才可以用自旋锁来保护。所以使用一个spinLock来保护对queue的访问,还是比较合适的。

    并发集合类

           FCL自带4个线程安全的集合类,全部在System.Collections.Concurrent命名空间中定义。它们是ConcurrentStack、concurrentQueue、concurrentDictionary、concurrentBag。

    所有这些集合都是“非阻塞”的,换而言之,如果一个线程试图提取一个不存在的元素(数据项),线程会立即返回;线程不会阻塞在那里,等着一个元素的出现。正是由于这个原因,所以如果获取了一个数据项,像tryDequeue,tryPop,tryTake和tryGetValue这样的方法全部返回true;否则返回false。

           一个集合“非阻塞”,并不意味着他就不需要锁了。concurrentDictionary类在内部使用了Monitor。但是,对集合中的项进行操作时,锁只被占有极短的时间。concurrentQueue和ConcurrentStack确实不需要锁;他们两个在内部都使用interlocked的方法来操纵集合。一个concurrentBag对象由大量迷你集合对象构成,每个线程一个。线程将一个项添加到bag中时,就用interlocked的方法将这个项添加到调用线程的迷你集合中。一个线程视图从bag中提取一个元素时,bag就检查调用线程的迷你集合,试图从中取出数据项。如果数据项在哪里,就用一个interlocked方法提取这个项。如果不在,就在内部获取一个monitor,以便从 线程的迷你集合提取一个项。这称为一个线程从另一个线程“窃取”一个数据项。

           注意,所有并发集合类都提供了getEnumerator方法,他一般用于C#的foreach语句,但也可用于Linq。对于concurrentQueue、ConcurrentStack和concurrentBag类,getEnumerator方法获取集合内容的一个“快照”,并从这个快照中返回元素;实际集合内容可能在使用快照枚举时发生改变。concurrentDictionary的getEnumerator的该方法不获取他内容的快照。因此,在枚举字典期间,字典的内容可能改变。还要注意,count属性返回的是查询时集合中的元素数量,如果其他线程同时正在集合中增删,这个计数可能马上就变得不正确。

           ConcurrentStack、concurrentQueue、concurrentBag都实现了IProducerConsumerCollection接口,实现了这个接口的任何类都能转变成一个阻塞集合,不过,尽量不使用这种阻塞集合。

           这里我们重点介绍下concurrentDictionary。

    ConcurrentDictionary

           这里我对.net core中ConcurrentDictionary源码进行了分析,里面采用了Volatile.Read和write,然后也使用了lock这种混合锁,而且还定义了更细颗粒度的锁。所以多线程使用ConcurrentDictionary集合还是比较好的选择。

    TryRemove

    这个方法会调用内部私用的TryRemoveInternal

    private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
    {
        int hashcode = _comparer.GetHashCode(key);
        while (true)
        {
            Tables tables = _tables;
            int bucketNo, lockNo;
            //这里获取桶的索引和锁的索引,注意,锁的索引和桶未必是同一个值,具体算法看源码。
            GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
            //这里锁住的只是对应这个index指向的锁,而不是所有锁。
            lock (tables._locks[lockNo])
            {
                //这里table可能被重新分配,所以这里再次获取,看得到的是不是同一个table
                // If the table just got resized, we may not be holding the right lock, and must retry.
                // This should be a rare occurrence.
                if (tables != _tables)
                {
                    continue;
                }
    
                Node prev = null;
                //这里同一个桶,可能因为连地址,有很多值,所以要对比key
                for (Node curr = tables._buckets[bucketNo]; curr != null; curr = curr._next)
                {
                    Debug.Assert((prev == null && curr == tables._buckets[bucketNo]) || prev._next == curr);
                    //对比是不是要删除的的那个元素
                    if (hashcode == curr._hashcode && _comparer.Equals(curr._key, key))
                    {
                        if (matchValue)
                        {
                            bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr._value);
                            if (!valuesMatch)
                            {
                                value = default(TValue);
                                return false;
                            }
                        }
                        //执行删除,判断有没有上一个节点。然后修改节点指针或地址。
                        if (prev == null)
                        {
                            Volatile.Write<Node>(ref tables._buckets[bucketNo], curr._next);
                        }
                        else
                        {
                            prev._next = curr._next;
                        }
    
                        value = curr._value;
                        tables._countPerLock[lockNo]--;
                        return true;
                    }
                    prev = curr;
                }
            }
            value = default(TValue);
            return false;
        }
    }
    TryRemoveInternal

    TryAdd

    这个方法会调用内部私用的TryAddInternal

    TryAddInternal(key, _comparer.GetHashCode(key), value, false, true, out dummy);

    /// <summary>
    /// Shared internal implementation for inserts and updates.
    /// If key exists, we always return false; and if updateIfExists == true we force update with value;
    /// If key doesn't exist, we always add value and return true;
    /// </summary>
    private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
    {
        Debug.Assert(_comparer.GetHashCode(key) == hashcode);
        while (true)
        {
            int bucketNo, lockNo;
            Tables tables = _tables;
    //老方法了,不多说,获取hash索引和锁索引
            GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length);
            bool resizeDesired = false;
            bool lockTaken = false;
            try
            {
                //这里都是true的,所以会获取锁
                if (acquireLock)
                    Monitor.Enter(tables._locks[lockNo], ref lockTaken);
    
                // If the table just got resized, we may not be holding the right lock, and must retry.
                // This should be a rare occurrence.
                if (tables != _tables)
                {
                    continue;
                }
    
                // Try to find this key in the bucket
                Node prev = null;
                //查看对应的桶里,
                for (Node node = tables._buckets[bucketNo]; node != null; node = node._next)
                {
                    Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node);
                    //查看有没有相同的key值,有就返回false
                    if (hashcode == node._hashcode && _comparer.Equals(node._key, key))
                    {
                        // The key was found in the dictionary. If updates are allowed, update the value for that key.
                        // We need to create a new node for the update, in order to support TValue types that cannot
                        // be written atomically, since lock-free reads may be happening concurrently.
                        //这个应该是addorupdate使用的,存在就更新。
                        if (updateIfExists)
                        {
                            if (s_isValueWriteAtomic)
                            {
                                node._value = value;
                            }
                            else
                            {
                                Node newNode = new Node(node._key, value, hashcode, node._next);
                                if (prev == null)
                                {
                                    Volatile.Write(ref tables._buckets[bucketNo], newNode);
                                }
                                else
                                {
                                    prev._next = newNode;
                                }
                            }
                            resultingValue = value;
                        }
                        else
                        {
                            resultingValue = node._value;
                        }
                        return false;
                    }
                    prev = node;
                }
                
                // The key was not found in the bucket. Insert the key-value pair.
                Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo]));
                //这里checked检查是否存在溢出。
                checked
                {
                    tables._countPerLock[lockNo]++;
                }
                // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table.
                // It is also possible that GrowTable will increase the budget but won't resize the bucket table.
                // That happens if the bucket table is found to be poorly utilized due to a bad hash function.
                // _budget是 The maximum number of elements per lock before a resize operation is triggered
                if (tables._countPerLock[lockNo] > _budget)
                {
                    resizeDesired = true;
                }
            }
            finally
            {
                if (lockTaken)
                    Monitor.Exit(tables._locks[lockNo]);
            }
            // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table.
            //
            // Concurrency notes:
            // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks.
            //As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0
    
            //   and then verify that the table we passed to it as the argument is still the current table.
            if (resizeDesired)
            {
                GrowTable(tables);
            }
    //赋值
            resultingValue = value;
            return true;
        }
    }
    TryAddInternal

    TryGetValue

    TryGetValueInternal(key, _comparer.GetHashCode(key), out value);

    private bool TryGetValueInternal(TKey key, int hashcode, out TValue value)
    {
        Debug.Assert(_comparer.GetHashCode(key) == hashcode);
        //用本地变量保存这个table的快照。
        // We must capture the _buckets field in a local variable. It is set to a new table on each table resize.
        Tables tables = _tables;
        int bucketNo = GetBucket(hashcode, tables._buckets.Length);
        // We can get away w/out a lock here.
        // The Volatile.Read ensures that we have a copy of the reference to tables._buckets[bucketNo].
        // This protects us from reading fields ('_hashcode', '_key', '_value' and '_next') of different instances.
    Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]);
    //如果key相符 ,赋值,不然继续寻找下一个。
        while (n != null)
        {
            if (hashcode == n._hashcode && _comparer.Equals(n._key, key))
            {
                value = n._value;
                return true;
            }
            n = n._next;
        }
        value = default(TValue);//没找到就赋默认值
        return false;
    }
  • 相关阅读:
    Redis常见七种使用场景(PHP实战)
    session垃圾回收机制
    header 头各种类型文件下载
    SQL优化(面试题)
    spring中bean的生命周期
    JAVA的分布式锁
    Java微服务下的分布式事务介绍及其解决方案
    Java反射
    类加载的三种方式
    Sql语句的基本查询用法,两表联查,3表联查
  • 原文地址:https://www.cnblogs.com/qixinbo/p/9591333.html
Copyright © 2011-2022 走看看