zoukankan      html  css  js  c++  java
  • C# ConcurrentQueue实现

     我们从C# Queue 和Stack的实现知道Queue是用数组来实现的,数组的元素不断的通过Array.Copy从一个数组移动到另一个数组,ConcurrentQueue我们需要关心2点:1线程安全是怎么实现的,2队列又是怎么实现的?我们来看看其实现code:

    public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection
    {
        void CopyTo(T[] array, int index);
        bool TryAdd(T item);
        bool TryTake(out T item);
        T[] ToArray();
    }
    
     public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        [NonSerialized]
        private volatile Segment m_head;
    
        [NonSerialized]
        private volatile Segment m_tail;
    
        private T[] m_serializationArray; // Used for custom serialization.
    
        private const int SEGMENT_SIZE = 32;
    
        //number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.
        [NonSerialized]
        internal volatile int m_numSnapshotTakers = 0;
        
        public ConcurrentQueue()
        {
            m_head = m_tail = new Segment(0, this);
        }
        
        public ConcurrentQueue(IEnumerable<T> collection)
        {
            if (collection == null)
            {
                throw new ArgumentNullException("collection");
            }
    
            InitializeFromCollection(collection);
        }
        
        private void InitializeFromCollection(IEnumerable<T> collection)
        {
            Segment localTail = new Segment(0, this);//use this local variable to avoid the extra volatile read/write. this is safe because it is only called from ctor
            m_head = localTail; 
    
            int index = 0;
            foreach (T element in collection)
            {
                Contract.Assert(index >= 0 && index < SEGMENT_SIZE);
                localTail.UnsafeAdd(element);
                index++;
    
                if (index >= SEGMENT_SIZE)
                {
                    localTail = localTail.UnsafeGrow();
                    index = 0;
                }
            }
    
            m_tail = localTail;
        }
        
        public void Enqueue(T item)
        {
            SpinWait spin = new SpinWait();
            while (true)
            {
                Segment tail = m_tail;
                if (tail.TryAppend(item))
                    return;
                spin.SpinOnce();
            }
        }
        
        public bool TryDequeue(out T result)
        {
            while (!IsEmpty)
            {
                Segment head = m_head;
                if (head.TryRemove(out result))
                    return true;
                //since method IsEmpty spins, we don't need to spin in the while loop
            }
            result = default(T);
            return false;
        }
        
        private class Segment
       {
            internal volatile T[] m_array;
            internal volatile VolatileBool[] m_state;
            private volatile Segment m_next;
            internal readonly long m_index;
            private volatile int m_low;
            private volatile int m_high;
            private volatile ConcurrentQueue<T> m_source;
          
            internal Segment(long index, ConcurrentQueue<T> source)
            {
                m_array = new T[SEGMENT_SIZE];
                m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false
                m_high = -1;
                Contract.Assert(index >= 0);
                m_index = index;
                m_source = source;
            }
            
            internal void UnsafeAdd(T value)
            {
                Contract.Assert(m_high < SEGMENT_SIZE - 1);
                m_high++;
                m_array[m_high] = value;
                m_state[m_high].m_value = true;
            }
            
           internal Segment UnsafeGrow()
            {
                Contract.Assert(m_high >= SEGMENT_SIZE - 1);
                Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow
                m_next = newSegment;
                return newSegment;
            }
            
          internal void Grow()
            {
                //no CAS is needed, since there is no contention (other threads are blocked, busy waiting)
                Segment newSegment = new Segment(m_index + 1, m_source);  //m_index is Int64, we don't need to worry about overflow
                m_next = newSegment;
                Contract.Assert(m_source.m_tail == this);
                m_source.m_tail = m_next;
            }    
            
           internal bool TryAppend(T value)
           {             
                if (m_high >= SEGMENT_SIZE - 1)
                {
                    return false;
                }
                try
                { }
                finally
                {
                    newhigh = Interlocked.Increment(ref m_high);
                    if (newhigh <= SEGMENT_SIZE - 1)
                    {
                        m_array[newhigh] = value;
                        m_state[newhigh].m_value = true;
                    }
    
                    if (newhigh == SEGMENT_SIZE - 1)
                    {
                        Grow();
                    }
                }
            
                return newhigh <= SEGMENT_SIZE - 1;
            }
            
          internal bool TryRemove(out T result)
            {
                SpinWait spin = new SpinWait();
                int lowLocal = Low, highLocal = High;
                while (lowLocal <= highLocal)
                {
                    if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal)
                    {
                      
                        SpinWait spinLocal = new SpinWait();
                        while (!m_state[lowLocal].m_value)
                        {
                            spinLocal.SpinOnce();
                        }
                        result = m_array[lowLocal];
    
                        if (m_source.m_numSnapshotTakers <= 0)
                        {
                            m_array[lowLocal] = default(T); //release the reference to the object. 
                        }
    
                     
                        if (lowLocal + 1 >= SEGMENT_SIZE)
                        {
                           
                            spinLocal = new SpinWait();
                            while (m_next == null)
                            {
                                spinLocal.SpinOnce();
                            }
                            Contract.Assert(m_source.m_head == this);
                            m_source.m_head = m_next;
                        }
                        return true;
                    }
                    else
                    {
                       
                        spin.SpinOnce();
                        lowLocal = Low; highLocal = High;
                    }
                }//end of while
                result = default(T);
                return false;
            }        
     }    
    }

    首先ConcurrentQueue构造函数没有 int capacity参数了,里面的线程安全是用SpinWait自旋来实现的,当我想往队列ConcurrentQueue添加一个元素的时候,如果添加失败,那程序自旋等待一下,再次添加元素,直到添加成功。里面用到了一个Segment自定义的类型,Segment的m_array是一个含有32个元素的数组,m_state和m_array一一对应,主要是用来标记m_array里面的元素是否有效。m_next是用来连接到下一个Segment的,m_high与添加元素密切相关,m_low与移除元素有关。先看TryAppend方法,优先将当前的newhigh原子加1【newhigh = Interlocked.Increment(ref m_high);】,这样假如有多个线程同时添加元素,每一个线程拿到的newhigh 值不用,那么它们操作m_array的下标就不同了,所以彼此之间不影响,到现在添加的线程安全就明白了。那么队列又是如何实现的了?我们来看看Grow()方法,当Segment的32个元素都被使用了,那么这个时候添加元素需要扩容,扩容的方式是重新实例一个Segment,旧的Segment的m_next属性指向新的得Segment,这样就组成了一个Segment链表(Segment核心是数组),它们的index从0开始,第一个Segment的index为0,第2个Segment的index为1....。TryRemove的实现类似,m_low其实是Segment的m_array下标,程序自旋一次,查找是否有元素,如果有元素必须检查元素是否有效(!m_state[lowLocal].m_value,因为在天加元素的时候是先增加newhigh变量,然后在设置m_state[newhigh].m_value = true有效),所以移除元素的时候必选验证元素是否有效。然后释放元素m_array[lowLocal] = default(T);,如果第一个Segment的元素全部移除了【if (lowLocal + 1 >= SEGMENT_SIZE)】,那么我们就应该开始移除第2个Segment元素了,需要检查是否有第2个Segment,如果没有 就自旋等待吧,然后m_head指向第下一个Segment【m_source.m_head = m_next;】线程安全依靠SpinWait 的自旋和原子操作Interlocked.Increment和Interlocked.CompareExchange来实现的。

  • 相关阅读:
    HDU 3416
    The connection to adb is down, and a severe error has occured
    HDU 2255 奔小康赚大钱 KM裸题
    springMVC --@RequestParam注解(后台控制器获取參数)
    面试宝典之预处理、const与sizeof
    oracle中字符串类似度函数实測
    Android学习之路
    007_尚学堂_高明鑫_android 之项目的打包apk与apk的反编译
    提高eclipse使用效率(二)—— 提高Android开发效率的小技巧
    提高eclipse使用效率(一)--使用快捷键
  • 原文地址:https://www.cnblogs.com/majiang/p/7879864.html
Copyright © 2011-2022 走看看