zoukankan      html  css  js  c++  java
  • .net线程池内幕

    本文通过对.NET4.5的ThreadPool源码的分析讲解揭示.NET线程池的内幕,并总结ThreadPool设计的好与不足。

    线程池的作用
    线程池,顾名思义,线程对象池。Task和TPL都有用到线程池,所以了解线程池的内幕有助于你写出更好的程序。由于篇幅有限,在这里我只讲解以下核心概念:

    • 线程池的大小
    • 如何调用线程池添加任务
    • 线程池如何执行任务

    Threadpool也支持操控IOCP的线程,但在这里我们不研究它,涉及到task和TPL的会在其各自的博客中做详解。

    线程池的大小
    不管什么池,总有尺寸,ThreadPool也不例外。ThreadPool提供了4个方法来调整线程池的大小:

    • SetMaxThreads
    • GetMaxThreads
    • SetMinThreads
    • GetMinThreads

    SetMaxThreads指定线程池最多可以有多少个线程,而GetMaxThreads自然就是获取这个值。SetMinThreads指定线程池中最少存活的线程的数量,而GetMinThreads就是获取这个值。
    为何要设置一个最大数量和有一个最小数量呢?原来线程池的大小取决于若干因素,如虚拟地址空间的大小等。比如你的计算机是4g内存,而一个线程的初始堆栈大小为1m,那么你最多能创建4g/1m的线程(忽略操作系统本身以及其他进程内存分配);正因为线程有内存开销,所以如果线程池的线程过多而又没有被完全使用,那么这就是对内存的一种浪费,所以限制线程池的最大数是很make sense的。
    那么最小数又是为啥?线程池就是线程的对象池,对象池的最大的用处是重用对象。为啥要重用线程,因为线程的创建与销毁都要占用大量的cpu时间。所以在高并发状态下,线程池由于无需创建销毁线程节约了大量时间,提高了系统的响应能力和吞吐量。最小数可以让你调整最小的存活线程数量来应对不同的高并发场景。

    如何调用线程池添加任务
    线程池主要提供了2个方法来调用:QueueUserWorkItem和UnsafeQueueUserWorkItem。
    两个方法的代码基本一致,除了attribute不同,QueueUserWorkItem可以被partial trust的代码调用,而UnsafeQueueUserWorkItem只能被full trust的代码调用。

    1 public static bool QueueUserWorkItem(WaitCallback callBack)
    2 {
    3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
    4 return ThreadPool.QueueUserWorkItemHelper(callBack, (object) null, ref stackMark, true);
    5 }

    QueueUserWorkItemHelper首先调用ThreadPool.EnsureVMInitialized()来确保CLR虚拟机初始化(VM是一个统称,不是单指java虚拟机,也可以指CLR的execution engine),紧接着实例化ThreadPoolWorkQueue,最后调用ThreadPoolWorkQueue的Enqueue方法并传入callback和true。

     1 [SecurityCritical]
     2 public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
     3 {
     4 ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null;
     5 if (!forceGlobal)
     6 queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals;
     7 if (this.loggingEnabled)
     8 FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback);
     9 if (queueThreadLocals != null)
    10 {
    11 queueThreadLocals.workStealingQueue.LocalPush(callback);
    12 }
    13 else
    14 {
    15 ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead;
    16 while (!comparand.TryEnqueue(callback))
    17 {
    18 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null);
    19 for (; comparand.Next != null; comparand = this.queueHead)
    20 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand);
    21 }
    22 }
    23 this.EnsureThreadRequested();
    24 }

    ThreadPoolWorkQueue主要包含2个“queue”(实际是数组),一个为QueueSegment(global work queue),另一个是WorkStealingQueue(local work queue)。两者具体的区别会在Task/TPL里讲解,这里暂不解释。
    由于forceGlobal是true,所以执行到了comparand.TryEnqueue(callback),也就是QueueSegment.TryEnqueue。comparand先从队列的头(queueHead)开始enqueue,如果不行就继续往下enqueue,成功后再赋值给queueHead。
    让我们来看看QueueSegment的源代码:

     1 public QueueSegment()
     2 {
     3 this.nodes = new IThreadPoolWorkItem[256];
     4 }
     5 
     6 public bool TryEnqueue(IThreadPoolWorkItem node)
     7 {
     8 int upper;
     9 int lower;
    10 this.GetIndexes(out upper, out lower);
    11 while (upper != this.nodes.Length)
    12 {
    13 if (this.CompareExchangeIndexes(ref upper, upper + 1, ref lower, lower))
    14 {
    15 Volatile.Write<IThreadPoolWorkItem>(ref this.nodes[upper], node);
    16 return true;
    17 }
    18 }
    19 return false;
    20 }

    这个所谓的global work queue实际上是一个IThreadPoolWorkItem的数组,而且限死256,这是为啥?难道是因为和IIS线程池(也只有256个线程)对齐?使用interlock和内存写屏障volatile.write来保证nodes的正确性,比起同步锁性能有很大的提高。最后调用EnsureThreadRequested,EnsureThreadRequested会调用QCall把请求发送至CLR,由CLR调度ThreadPool。

    线程池如何执行任务
    线程被调度后通过ThreadPoolWorkQueue的Dispatch方法来执行callback。

     1 internal static bool Dispatch()
     2 {
     3 ThreadPoolWorkQueue threadPoolWorkQueue = ThreadPoolGlobals.workQueue;
     4 int tickCount = Environment.TickCount;
     5 threadPoolWorkQueue.MarkThreadRequestSatisfied();
     6 threadPoolWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, (EventKeywords) 18);
     7 bool flag1 = true;
     8 IThreadPoolWorkItem callback = (IThreadPoolWorkItem) null;
     9 try
    10 {
    11 ThreadPoolWorkQueueThreadLocals tl = threadPoolWorkQueue.EnsureCurrentThreadHasQueue();
    12 while ((long) (Environment.TickCount - tickCount) < (long) ThreadPoolGlobals.tpQuantum)
    13 {
    14 try
    15 {
    16 }
    17 finally
    18 {
    19 bool missedSteal = false;
    20 threadPoolWorkQueue.Dequeue(tl, out callback, out missedSteal);
    21 if (callback == null)
    22 flag1 = missedSteal;
    23 else
    24 threadPoolWorkQueue.EnsureThreadRequested();
    25 }
    26 if (callback == null)
    27 return true;
    28 if (threadPoolWorkQueue.loggingEnabled)
    29 FrameworkEventSource.Log.ThreadPoolDequeueWorkObject((object) callback);
    30 if (ThreadPoolGlobals.enableWorkerTracking)
    31 {
    32 bool flag2 = false;
    33 try
    34 {
    35 try
    36 {
    37 }
    38 finally
    39 {
    40 ThreadPool.ReportThreadStatus(true);
    41 flag2 = true;
    42 }
    43 callback.ExecuteWorkItem();
    44 callback = (IThreadPoolWorkItem) null;
    45 }
    46 finally
    47 {
    48 if (flag2)
    49 ThreadPool.ReportThreadStatus(false);
    50 }
    51 }
    52 else
    53 {
    54 callback.ExecuteWorkItem();
    55 callback = (IThreadPoolWorkItem) null;
    56 }
    57 if (!ThreadPool.NotifyWorkItemComplete())
    58 return false;
    59 }
    60 return true;
    61 }
    62 catch (ThreadAbortException ex)
    63 {
    64 if (callback != null)
    65 callback.MarkAborted(ex);
    66 flag1 = false;
    67 }
    68 finally
    69 {
    70 if (flag1)
    71 threadPoolWorkQueue.EnsureThreadRequested();
    72 }
    73 return true;
    74 }

    while语句判断如果执行时间少于30ms会不断继续执行下一个callback。这是因为大多数机器线程切换大概在30ms,如果该线程只执行了不到30ms就在等待中断线程切换那就太浪费CPU了,浪费可耻啊!
    Dequeue负责找到需要执行的callback:

     1 public void Dequeue(ThreadPoolWorkQueueThreadLocals tl, out IThreadPoolWorkItem callback, out bool missedSteal)
     2 {
     3 callback = (IThreadPoolWorkItem) null;
     4 missedSteal = false;
     5 ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue1 = tl.workStealingQueue;
     6 workStealingQueue1.LocalPop(out callback);
     7 if (callback == null)
     8 {
     9 for (ThreadPoolWorkQueue.QueueSegment comparand = this.queueTail; !comparand.TryDequeue(out callback) && comparand.Next != null && comparand.IsUsedUp(); comparand = this.queueTail)
    10 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueTail, comparand.Next, comparand);
    11 }
    12 if (callback != null)
    13 return;
    14 ThreadPoolWorkQueue.WorkStealingQueue[] current = ThreadPoolWorkQueue.allThreadQueues.Current;
    15 int num = tl.random.Next(current.Length);
    16 for (int length = current.Length; length > 0; --length)
    17 {
    18 ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue2 = Volatile.Read<ThreadPoolWorkQueue.WorkStealingQueue>(ref current[num % current.Length]);
    19 if (workStealingQueue2 != null && workStealingQueue2 != workStealingQueue1 && workStealingQueue2.TrySteal(out callback, ref missedSteal))
    20 break;
    21 ++num;
    22 }
    23 }

    因为我们把callback添加到了global work queue,所以local work queue(workStealingQueue.LocalPop(out callback))找不到callback,local work queue查找callback会在task里讲解。接着又去global work queue查找,先从global work queue的起始位置查找直至尾部,因此global work quque里的callback是FIFO的执行顺序。

     1 public bool TryDequeue(out IThreadPoolWorkItem node)
     2 {
     3 int upper;
     4 int lower;
     5 this.GetIndexes(out upper, out lower);
     6 while (lower != upper)
     7 {
     8 // ISSUE: explicit reference operation
     9 // ISSUE: variable of a reference type
    10 int& prevUpper = @upper;
    11 // ISSUE: explicit reference operation
    12 int newUpper = ^prevUpper;
    13 // ISSUE: explicit reference operation
    14 // ISSUE: variable of a reference type
    15 int& prevLower = @lower;
    16 // ISSUE: explicit reference operation
    17 int newLower = ^prevLower + 1;
    18 if (this.CompareExchangeIndexes(prevUpper, newUpper, prevLower, newLower))
    19 {
    20 SpinWait spinWait = new SpinWait();
    21 while ((node = Volatile.Read<IThreadPoolWorkItem>(ref this.nodes[lower])) == null)
    22 spinWait.SpinOnce();
    23 this.nodes[lower] = (IThreadPoolWorkItem) null;
    24 return true;
    25 }
    26 }
    27 node = (IThreadPoolWorkItem) null;
    28 return false;
    29 }

    使用自旋锁和内存读屏障来避免内核态和用户态的切换,提高了获取callback的性能。如果还是没有callback,那么就从所有的local work queue里随机选取一个,然后在该local work queue里“偷取”一个任务(callback)。
    拿到callback后执行callback.ExecuteWorkItem(),通知完成。

    总结
    ThreadPool提供了方法调整线程池最少活跃的线程来应对不同的并发场景。ThreadPool带有2个work queue,一个golbal一个local。执行时先从local找任务,接着去global,最后才会去随机选取一个local偷一个任务,其中global是FIFO的执行顺序。Work queue实际上是数组,使用了大量的自旋锁和内存屏障来提高性能。但是在偷取任务上,是否可以考虑得更多,随机选择一个local太随意。首先要考虑偷取的队列上必须有可执行任务;其次可以选取一个不在调度中的线程的local work queue,这样降低了自旋锁的可能性,加快了偷取的速度;最后,偷取的时候可以考虑像golang一样偷取别人queue里一半的任务,因为执行完偷到的这一个任务之后,下次该线程再次被调度到还是可能没任务可执行,还得去偷取别人的任务,这样既浪费CPU时间,又让任务在线程上分布不均匀,降低了系统吞吐量!

    另外,如果禁用log和ETW trace,可以使ThreadPool的性能更进一步。

  • 相关阅读:
    jquery异步加载json格式的数据
    三角形及选中取消按钮的css代码
    css实现自适应宽度布局
    table表格中实现tbody部分可滚动,且thead部分固定
    table数据表格添加checkbox进行数据进行两个表格左右移动。
    对checkbox 的checked的一些总结
    Java多线程同步器
    Springboot动态获取bean对象工具类
    并发阻塞队列和非阻塞队列详解
    多线程-volatile关键字和ThreadLocal详解
  • 原文地址:https://www.cnblogs.com/newbier/p/6192882.html
Copyright © 2011-2022 走看看