zoukankan      html  css  js  c++  java
  • [.net]ConcurrentBag源码分析

    ConcurrentBag根据操作线程,对不同线程分配不同的队列进行数据操作。这样,每个队列只有一个线程在操作,不会发生并发问题。其内部实现运用了net4.0新加入的ThreadLocal线程本地存储功能。各个队列间通过链表维护。

    其内部结构如下:

     

    1、获取线程本地队列:

     1 /// <summary>
     2 /// 获取当前线程的队列
     3 /// </summary>
     4 /// <param name="forceCreate">如果线程没有持有队列,是否新建</param>
     5 /// <returns></returns>
     6 private ThreadLocalList<T> GetThreadList(bool forceCreate)
     7 {
     8     //尝试获取线程本地队列列表(参考ThreadLocal),此处的m_locals不同线程持有不同实例
     9     //如果获取为空,则说明线程是第一次执行此函数,需要分配一个队列
    10     ThreadLocalList<T> unownedList = this.m_locals.Value;
    11     if (unownedList != null)
    12     {
    13         return unownedList;
    14     }
    15     if (forceCreate)
    16     {
    17         //获取当前本地队列锁,防止在冻结队列时产生冲突(参考FreezeBag函数)
    18         object globalListsLock = this.GlobalListsLock;
    19         lock (globalListsLock)
    20         {
    21             //获取本地队列
    22             //如果没有创建过队列,则创建一个新的队列;否则尽量分配已有的线程终止的队列
    23             if (this.m_headList == null)
    24             {
    25                 unownedList = new ThreadLocalList<T>(Thread.CurrentThread);
    26                 this.m_headList = unownedList;
    27                 this.m_tailList = unownedList;
    28             }
    29             else
    30             {
    31                 //获取无主队列,不分配新队列
    32                 unownedList = this.GetUnownedList();
    33                 if (unownedList == null)
    34                 {
    35                     unownedList = new ThreadLocalList<T>(Thread.CurrentThread);
    36                     this.m_tailList.m_nextList = unownedList;
    37                     this.m_tailList = unownedList;
    38                 }
    39             }
    40             this.m_locals.Value = unownedList;
    41             return unownedList;
    42         }
    43     }
    44     return null;
    45 }
    获取当前线程持有的队列

    2、获取无主队列

     1 /// <summary>
     2 /// 获取无主队列
     3 /// 如果当前队列的持有线程已经终止,则为无主队列
     4 /// </summary>
     5 /// <returns></returns>
     6 private ThreadLocalList<T> GetUnownedList()
     7 {
     8     for (ThreadLocalList<T> list = this.m_headList; list != null; list = list.m_nextList)
     9     {
    10         if (list.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
    11         {
    12             list.m_ownerThread = Thread.CurrentThread;
    13             return list;
    14         }
    15     }
    16     return null;
    17 }
    获取无主队列

    3、插入操作代码分析

     1 /// <summary>
     2 /// 向Bag添加元素
     3 /// </summary>
     4 /// <param name="item"></param>
     5        
     6 [__DynamicallyInvokable]
     7 public void Add(T item)
     8 {
     9     //获取当前线程持有的队列
    10     ThreadLocalList<T> threadList = this.GetThreadList(true);
    11     //向当前持有队列添加数据
    12     this.AddInternal(threadList, item);
    13 }
    14 
    15 /// <summary>
    16 /// 向队列添加数据
    17 /// </summary>
    18 /// <param name="list"></param>
    19 /// <param name="item"></param>
    20 private void AddInternal(ThreadLocalList<T> list, T item)
    21 {
    22     bool lockTaken = false;
    23     try
    24     {
    25         //CAS原子操作,设置标志位,与Steal和Freeze实现互斥
    26         Interlocked.Exchange(ref list.m_currentOp, 1);
    27         //如果m_needSync,则说明已经发起冻结操作,需要加锁保证线程安全
    28         if ((list.Count < 2) || this.m_needSync)
    29         {
    30             list.m_currentOp = 0;
    31             Monitor.Enter(list, ref lockTaken);
    32         }
    33         list.Add(item, lockTaken);
    34     }
    35     finally
    36     {
    37         list.m_currentOp = 0;
    38         if (lockTaken)
    39         {
    40             Monitor.Exit(list);
    41         }
    42     }
    43 }
    插入操作

    4、冻结Bag函数

     1 /// <summary>
     2 /// 冻结Bag,不能进行增,删,获取操作
     3 /// </summary>
     4 /// <param name="lockTaken"></param>
     5 private void FreezeBag(ref bool lockTaken)
     6 {
     7     //获取当前线程list锁
     8     Monitor.Enter(this.GlobalListsLock, ref lockTaken);
     9     //设置同步标志位,增,删,获取操作识别此标志位,只有获取锁才能执行
    10     this.m_needSync = true;
    11     //获取所有list的锁
    12     this.AcquireAllLocks();
    13     //等待所有操作执行完成
    14     this.WaitAllOperations();
    15 }
    冻结bag

    5、转化成数组

     1 /// <summary>
     2 /// 转化为数组
     3 /// </summary>
     4 /// <returns></returns>
     5 [__DynamicallyInvokable]
     6 public T[] ToArray()
     7 {
     8     T[] localArray;
     9     //没有数据返回空数组
    10     if (this.m_headList == null)
    11     {
    12         return new T[0];
    13     }
    14     bool lockTaken = false;
    15     try
    16     {
    17         //冻结bag
    18         this.FreezeBag(ref lockTaken);
    19         //转化成List后直接转成Array
    20         localArray = this.ToList().ToArray();
    21     }
    22     finally
    23     {
    24         this.UnfreezeBag(lockTaken);
    25     }
    26     return localArray;
    27 }
    28 
    29 /// <summary>
    30 /// 转化成list
    31 /// </summary>
    32 /// <returns></returns>
    33 private List<T> ToList()
    34 {
    35     List<T> list = new List<T>();
    36     //获取所有list,遍历生成副本
    37     for (ThreadLocalList<T> list2 = this.m_headList; list2 != null; list2 = list2.m_nextList)
    38     {
    39         for (Node<T> node = list2.m_head; node != null; node = node.m_next)
    40         {
    41             list.Add(node.m_value);
    42         }
    43     }
    44     return list;
    45 }
    转化成数组
  • 相关阅读:
    FileUtils功能概述
    java collections
    java的OutOfMemoryError: PermGen space实战剖析
    中华人民共和国知识产权行业标准——表格格式和代码标准 第1部分:表格代码规则
    JVM EXCEPTION_ACCESS_VIOLATION
    【转】Linux tail 命令详解
    【转】XSHELL下直接下载文件到本地(Windows)
    Java与JavaScript之间关于JSON的是非恩怨
    MySQL的Blob类型的手工编辑(manually edit)
    Linux上传下载
  • 原文地址:https://www.cnblogs.com/deepminer/p/9965046.html
Copyright © 2011-2022 走看看