zoukankan      html  css  js  c++  java
  • C# ConcurrentBag实现

    ConcurrentBag可以理解为是一个线程安全无序集合,API比我们的list要弱一点,那我们来看看它的实现:

      public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
        {
            // ThreadLocalList object that contains the data per thread
            ThreadLocal<ThreadLocalList> m_locals;
    
            // This head and tail pointers points to the first and last local lists, to allow enumeration on the thread locals objects
            volatile ThreadLocalList m_headList, m_tailList;
            
            bool m_needSync;
            
            public ConcurrentBag() { Initialize(null);}
            public ConcurrentBag(IEnumerable<T> collection)
            {
                if (collection == null)
                {
                    throw new ArgumentNullException("collection", SR.GetString(SR.ConcurrentBag_Ctor_ArgumentNullException));
                }
                Initialize(collection);
            }
            
            private void Initialize(IEnumerable<T> collection)
            {
                m_locals = new ThreadLocal<ThreadLocalList>();
    
                // Copy the collection to the bag
                if (collection != null)
                {
                    ThreadLocalList list = GetThreadList(true);
                    foreach (T item in collection)
                    {
                        list.Add(item, false);
                    }
                }
            }
            
            public void Add(T item)
            {
                // Get the local list for that thread, create a new list if this thread doesn't exist 
                //(first time to call add)
                ThreadLocalList list = GetThreadList(true);
                AddInternal(list, item);
            }
            
           private void AddInternal(ThreadLocalList list, T item)
            {
                bool lockTaken = false;
                try
                {
                    Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
                    //Synchronization cases:
                    // if the list count is less than two to avoid conflict with any stealing thread
                    // if m_needSync is set, this means there is a thread that needs to freeze the bag
                    if (list.Count < 2 || m_needSync)
                    {
                        // reset it back to zero to avoid deadlock with stealing thread
                        list.m_currentOp = (int)ListOperation.None;
                        Monitor.Enter(list, ref lockTaken);
                    }
                    list.Add(item, lockTaken);
                }
                finally
                {
                    list.m_currentOp = (int)ListOperation.None;
                    if (lockTaken)
                    {
                        Monitor.Exit(list);
                    }
                }
            }
            
            private ThreadLocalList GetThreadList(bool forceCreate)
            {
                ThreadLocalList list = m_locals.Value;
                if (list != null)
                {
                    return list;
                }
                else if (forceCreate)
                {
                    // Acquire the lock to update the m_tailList pointer
                    lock (GlobalListsLock)
                    {
                        if (m_headList == null)
                        {
                            list = new ThreadLocalList(Thread.CurrentThread);
                            m_headList = list;
                            m_tailList = list;
                        }
                        else
                        {
                            list = GetUnownedList();
                            if (list == null)
                            {
                                list = new ThreadLocalList(Thread.CurrentThread);
                                m_tailList.m_nextList = list;
                                m_tailList = list;
                            }
                        }
                        m_locals.Value = list;
                    }
                }
                else
                {
                    return null;
                }
                Debug.Assert(list != null);
                return list;
            }
            
            public bool TryTake(out T result)
            {
                return TryTakeOrPeek(out result, true);
            }
            
            public bool TryPeek(out T result)
            {
                return TryTakeOrPeek(out result, false);
            }
            
            private bool TryTakeOrPeek(out T result, bool take)
            {
                // Get the local list for that thread, return null if the thread doesn't exit 
                //(this thread never add before) 
                ThreadLocalList list = GetThreadList(false);
                if (list == null || list.Count == 0)
                {
                    return Steal(out result, take);
                }
                bool lockTaken = false;
                try
                {
                    if (take) // Take operation
                    {
                        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Take);
                        //Synchronization cases:
                        // if the list count is less than or equal two to avoid conflict with any stealing thread
                        // if m_needSync is set, this means there is a thread that needs to freeze the bag
                        if (list.Count <= 2 || m_needSync)
                        {
                            // reset it back to zero to avoid deadlock with stealing thread
                            list.m_currentOp = (int)ListOperation.None;
                            Monitor.Enter(list, ref lockTaken);
    
                            // Double check the count and steal if it became empty
                            if (list.Count == 0)
                            {
                                // Release the lock before stealing
                                if (lockTaken)
                                {
                                    try { }
                                    finally
                                    {
                                        lockTaken = false; // reset lockTaken to avoid calling Monitor.Exit again in the finally block
                                        Monitor.Exit(list);
                                    }
                                }
                                return Steal(out result, true);
                            }
                        }
                        list.Remove(out result);
                    }
                    else
                    {
                        if (!list.Peek(out result))
                        {
                            return Steal(out result, false);
                        }
                    }
                }
                finally
                {
                    list.m_currentOp = (int)ListOperation.None;
                    if (lockTaken)
                    {
                        Monitor.Exit(list);
                    }
                }
                return true;
            }
    
       private bool Steal(out T result, bool take)
            {
                bool loop;
                List<int> versionsList = new List<int>(); // save the lists version
                do
                {
                    versionsList.Clear(); //clear the list from the previous iteration
                    loop = false;
                  
                    ThreadLocalList currentList = m_headList;
                    while (currentList != null)
                    {
                        versionsList.Add(currentList.m_version);
                        if (currentList.m_head != null && TrySteal(currentList, out result, take))
                        {
                            return true;
                        }
                        currentList = currentList.m_nextList;
                    }
    
                    // verify versioning, if other items are added to this list since we last visit it, we should retry
                    currentList = m_headList;
                    foreach (int version in versionsList)
                    {
                        if (version != currentList.m_version) //oops state changed
                        {
                            loop = true;
                            if (currentList.m_head != null && TrySteal(currentList, out result, take))
                                return true;
                        }
                        currentList = currentList.m_nextList;
                    }
                } while (loop);
    
                result = default(T);
                return false;
            }
            
            private bool TrySteal(ThreadLocalList list, out T result, bool take)
            {
                lock (list)
                {
                    if (CanSteal(list))
                    {
                        list.Steal(out result, take);
                        return true;
                    }
                    result = default(T);
                    return false;
                }
            }
            
            private bool CanSteal(ThreadLocalList list)
            {
                if (list.Count <= 2 && list.m_currentOp != (int)ListOperation.None)
                {
                    SpinWait spinner = new SpinWait();
                    while (list.m_currentOp != (int)ListOperation.None)
                    {
                        spinner.SpinOnce();
                    }
                }
                if (list.Count > 0)
                {
                    return true;
                }
                return false;
            }
            /// <summary>
            /// Try to reuse an unowned list if exist
            /// unowned lists are the lists that their owner threads are aborted or terminated
            /// this is workaround to avoid memory leaks.
            /// </summary>
            /// <returns>The list object, null if all lists are owned</returns>
            private ThreadLocalList GetUnownedList()
            {
                //the global lock must be held at this point
                Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    
                ThreadLocalList currentList = m_headList;
                while (currentList != null)
                {
                    if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
                    {
                        currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
                        return currentList;
                    }
                    currentList = currentList.m_nextList;
                }
                return null;
            }
              internal class ThreadLocalList
            {
              
                internal volatile Node m_head;
                private volatile Node m_tail;
                internal volatile int m_currentOp;
                private int m_count;
                internal int m_stealCount;
                internal volatile ThreadLocalList m_nextList;
                internal bool m_lockTaken;
                internal Thread m_ownerThread;
                internal volatile int m_version;
                internal ThreadLocalList(Thread ownerThread)
                {
                    m_ownerThread = ownerThread;
                }
                internal void Add(T item, bool updateCount)
                {
                    checked
                    {
                        m_count++;
                    }
                    Node node = new Node(item);
                    if (m_head == null)
                    {
                        Debug.Assert(m_tail == null);
                        m_head = node;
                        m_tail = node;
                        m_version++; // changing from empty state to non empty state
                    }
                    else
                    {
                        node.m_next = m_head;
                        m_head.m_prev = node;
                        m_head = node;
                    }
                    if (updateCount) // update the count to avoid overflow if this add is synchronized
                    {
                        m_count = m_count - m_stealCount;
                        m_stealCount = 0;
                    }
                }
    
                /// <summary>
                /// Remove an item from the head of the list
                /// </summary>
                /// <param name="result">The removed item</param>
                internal void Remove(out T result)
                {
                    Debug.Assert(m_head != null);
                    Node head = m_head;
                    m_head = m_head.m_next;
                    if (m_head != null)
                    {
                        m_head.m_prev = null;
                    }
                    else
                    {
                        m_tail = null;
                    }
                    m_count--;
                    result = head.m_value;
    
                }
    
                /// <summary>
                /// Peek an item from the head of the list
                /// </summary>
                /// <param name="result">the peeked item</param>
                /// <returns>True if succeeded, false otherwise</returns>
                internal bool Peek(out T result)
                {
                    Node head = m_head;
                    if (head != null)
                    {
                        result = head.m_value;
                        return true;
                    }
                    result = default(T);
                    return false;
                }
    
                internal void Steal(out T result, bool remove)
                {
                    Node tail = m_tail;
                    Debug.Assert(tail != null);
                    if (remove) // Take operation
                    {
                        m_tail = m_tail.m_prev;
                        if (m_tail != null)
                        {
                            m_tail.m_next = null;
                        }
                        else
                        {
                            m_head = null;
                        }
                        // Increment the steal count
                        m_stealCount++;
                    }
                    result = tail.m_value;
                }
    
            }
           internal class Node
            {
                public Node(T value)
                {
                    m_value = value;
                }
                public readonly T m_value;
                public Node m_next;
                public Node m_prev;
            }
        }

    首先我们需要知道里面有2个内部类Node和ThreadLocalList都是链表结构,其中Node是双向链表,因为它有m_next和m_prev属性,但是ThreadLocalList确是单项链表只有m_nextList属性,ThreadLocalList是Node的集合,有m_head和m_tail属性指向Node实例。现在我们来看ConcurrentBag的几个变量,ThreadLocal<ThreadLocalList> m_locals表示当前线程的list,所以从这里我们可以猜测线程安全是采用ThreadLocal来实现的。 volatile ThreadLocalList m_headList, m_tailList;这2个变量应该是可以遍历所有线程的list

    无论是初始化Initialize方法还是添加元素的Add方法,我们首先要调用GetThreadList放来获取当前线程的list,GetThreadList方法 首先检查当前线程的m_locals.Value是否存在,有则直接返回;否者检查当前线程是否是程序第一个线程【m_headList == null】,如果是则创建新的ThreadLocalList,否者调用GetUnownedList放法检查是否有孤立ThreadLocalList使用【ThreadLocalList的逻辑线程已经停止,但是该ThreadLocalList实例确存在】,如果有则返回改ThreadLocalList,否则只有新建ThreadLocalList实例

    现在看看AddInternal方法的实现,首先修改ThreadLocalList的m_currentOp标记为添加元素【 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add)】,然后在添加元素 list.Add(item, lockTaken);,如果该list需要lock的话,那么在添加元素前我们还需要加锁Monitor.Enter(list, ref lockTaken),添加后需要解锁 Monitor.Exit(list)。ThreadLocalList的Add方法非常简单,把新节点放到链表头部【 node.m_next = m_head;m_head.m_prev = node; m_head = node;】

    添加元素时添加到各个线程的ThreadLocalList,那么读取就比较麻烦了,我们需要读取各各线程ThreadLocalList的数据,也就是说需要用到m_headList, m_tailList两个变量。如果当前线程存在ThreadLocalList实例,那么直接从ThreadLocalList里面拿去数据,如果需要加锁,那么我们就加锁【 Monitor.Enter(list, ref lockTaken)】和解锁【Monitor.Exit(list)】,都是当前线程的list,如果当前线程ThreadLocalList不存在,或者没有数据,我们需要从其他线程的ThreadLocalList获取数据,Steal方法 首先或从m_headList开始,依次遍历每一个ThreadLocalList,然后从它们里面获取数据,如果获取不到数据,那么就再次遍历一下所有的ThreadLocalList,检查哪些ThreadLocalList的版本m_version在这两次遍历过程中发生了变化。

      do
                {
                    versionsList.Clear(); //clear the list from the previous iteration
                    loop = false;
                  
    
                    ThreadLocalList currentList = m_headList;
                    while (currentList != null)
                    {
                        versionsList.Add(currentList.m_version);
                        if (currentList.m_head != null && TrySteal(currentList, out result, take))
                        {
                            return true;
                        }
                        currentList = currentList.m_nextList;
                    }
    
                    // verify versioning, if other items are added to this list since we last visit it, we should retry
                    currentList = m_headList;
                    foreach (int version in versionsList)
                    {
                        if (version != currentList.m_version) //oops state changed
                        {
                            loop = true;
                            if (currentList.m_head != null && TrySteal(currentList, out result, take))
                                return true;
                        }
                        currentList = currentList.m_nextList;
                    }
                } while (loop);

    TrySteal方法的实现就非常简单了,检查list是否可以查询数据【CanSteal(list)】,CanSteal里面也用了自旋来实现【if (list.Count <= 2 && list.m_currentOp != (int)ListOperation.None){ SpinWait spinner = new SpinWait(); while (list.m_currentOp != (int)ListOperation.None) {spinner.SpinOnce(); } }】,真正Steal实现是由ThreadLocalList来做的,比较简单。

  • 相关阅读:
    [BZOJ 1552] 排序机械臂
    [BZOJ 1124][POI 2008] 枪战 Maf
    [BZOJ 1647][USACO 2007 Open] Fliptile 翻格子游戏
    [BZOJ 1592] Making The Grade路面修整
    [BZOJ 3829][POI2014] FarmCraft
    [技术] 如何正确食用cnblogs的CSS定制
    [BZOJ 1458] 士兵占领
    今天写了一个Imageloader,,AndroidStudio报了Error:Execution failed for task ':app:mergeDebugResources'. > Error: Java.util.concurrent.ExecutionException: com.Android.ide.common.process.ProcessException: 这个错误
    Http响应码代表的含义
    获取WIFI列表,在旧手机上运行就没有问题,在新手机上就怎么也获取不到WIFI列表,长度一直为0,还不报异常,很疑惑。
  • 原文地址:https://www.cnblogs.com/majiang/p/7884556.html
Copyright © 2011-2022 走看看