ConcurrentBag根据操作线程,对不同线程分配不同的队列进行数据操作。这样,每个队列只有一个线程在操作,不会发生并发问题。其内部实现运用了net4.0新加入的ThreadLocal线程本地存储功能。各个队列间通过链表维护。
其内部结构如下:
1、获取线程本地队列:
1 /// <summary> 2 /// 获取当前线程的队列 3 /// </summary> 4 /// <param name="forceCreate">如果线程没有持有队列,是否新建</param> 5 /// <returns></returns> 6 private ThreadLocalList<T> GetThreadList(bool forceCreate) 7 { 8 //尝试获取线程本地队列列表(参考ThreadLocal),此处的m_locals不同线程持有不同实例 9 //如果获取为空,则说明线程是第一次执行此函数,需要分配一个队列 10 ThreadLocalList<T> unownedList = this.m_locals.Value; 11 if (unownedList != null) 12 { 13 return unownedList; 14 } 15 if (forceCreate) 16 { 17 //获取当前本地队列锁,防止在冻结队列时产生冲突(参考FreezeBag函数) 18 object globalListsLock = this.GlobalListsLock; 19 lock (globalListsLock) 20 { 21 //获取本地队列 22 //如果没有创建过队列,则创建一个新的队列;否则尽量分配已有的线程终止的队列 23 if (this.m_headList == null) 24 { 25 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 26 this.m_headList = unownedList; 27 this.m_tailList = unownedList; 28 } 29 else 30 { 31 //获取无主队列,不分配新队列 32 unownedList = this.GetUnownedList(); 33 if (unownedList == null) 34 { 35 unownedList = new ThreadLocalList<T>(Thread.CurrentThread); 36 this.m_tailList.m_nextList = unownedList; 37 this.m_tailList = unownedList; 38 } 39 } 40 this.m_locals.Value = unownedList; 41 return unownedList; 42 } 43 } 44 return null; 45 }
2、获取无主队列
1 /// <summary> 2 /// 获取无主队列 3 /// 如果当前队列的持有线程已经终止,则为无主队列 4 /// </summary> 5 /// <returns></returns> 6 private ThreadLocalList<T> GetUnownedList() 7 { 8 for (ThreadLocalList<T> list = this.m_headList; list != null; list = list.m_nextList) 9 { 10 if (list.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) 11 { 12 list.m_ownerThread = Thread.CurrentThread; 13 return list; 14 } 15 } 16 return null; 17 }
3、插入操作代码分析
1 /// <summary> 2 /// 向Bag添加元素 3 /// </summary> 4 /// <param name="item"></param> 5 6 [__DynamicallyInvokable] 7 public void Add(T item) 8 { 9 //获取当前线程持有的队列 10 ThreadLocalList<T> threadList = this.GetThreadList(true); 11 //向当前持有队列添加数据 12 this.AddInternal(threadList, item); 13 } 14 15 /// <summary> 16 /// 向队列添加数据 17 /// </summary> 18 /// <param name="list"></param> 19 /// <param name="item"></param> 20 private void AddInternal(ThreadLocalList<T> list, T item) 21 { 22 bool lockTaken = false; 23 try 24 { 25 //CAS原子操作,设置标志位,与Steal和Freeze实现互斥 26 Interlocked.Exchange(ref list.m_currentOp, 1); 27 //如果m_needSync,则说明已经发起冻结操作,需要加锁保证线程安全 28 if ((list.Count < 2) || this.m_needSync) 29 { 30 list.m_currentOp = 0; 31 Monitor.Enter(list, ref lockTaken); 32 } 33 list.Add(item, lockTaken); 34 } 35 finally 36 { 37 list.m_currentOp = 0; 38 if (lockTaken) 39 { 40 Monitor.Exit(list); 41 } 42 } 43 }
4、冻结Bag函数
1 /// <summary> 2 /// 冻结Bag,不能进行增,删,获取操作 3 /// </summary> 4 /// <param name="lockTaken"></param> 5 private void FreezeBag(ref bool lockTaken) 6 { 7 //获取当前线程list锁 8 Monitor.Enter(this.GlobalListsLock, ref lockTaken); 9 //设置同步标志位,增,删,获取操作识别此标志位,只有获取锁才能执行 10 this.m_needSync = true; 11 //获取所有list的锁 12 this.AcquireAllLocks(); 13 //等待所有操作执行完成 14 this.WaitAllOperations(); 15 }
5、转化成数组
1 /// <summary> 2 /// 转化为数组 3 /// </summary> 4 /// <returns></returns> 5 [__DynamicallyInvokable] 6 public T[] ToArray() 7 { 8 T[] localArray; 9 //没有数据返回空数组 10 if (this.m_headList == null) 11 { 12 return new T[0]; 13 } 14 bool lockTaken = false; 15 try 16 { 17 //冻结bag 18 this.FreezeBag(ref lockTaken); 19 //转化成List后直接转成Array 20 localArray = this.ToList().ToArray(); 21 } 22 finally 23 { 24 this.UnfreezeBag(lockTaken); 25 } 26 return localArray; 27 } 28 29 /// <summary> 30 /// 转化成list 31 /// </summary> 32 /// <returns></returns> 33 private List<T> ToList() 34 { 35 List<T> list = new List<T>(); 36 //获取所有list,遍历生成副本 37 for (ThreadLocalList<T> list2 = this.m_headList; list2 != null; list2 = list2.m_nextList) 38 { 39 for (Node<T> node = list2.m_head; node != null; node = node.m_next) 40 { 41 list.Add(node.m_value); 42 } 43 } 44 return list; 45 }