什么是计算限制的异步操作,当线程在要使用CPU进行计算的时候,那么就叫计算限制。
而对应的IO限制就是线程交给IO设备(键鼠,网络,文件等)。
第25章线程基础讲了用专用的线程进行计算限制的操作,但是创建专用线程开销大,而且太多的线程也浪费内存资源,那么本章就讨论一种更好的方法,即线程池技术。
CLR线程池
CLR包含了代码来管理它自己的线程池。线程池是应用程序能使用的线程集合,每个CLR一个线程池,这个线程池由CLR上所有的AppDomain共享。
CLR初始化时线程池中没有线程。
在线程池内部维护着一个操作请求队列,应用程序执行异步操作时,就调用某方法将一个记录项追加到线程池队列中。
线程池的代码从这个队列中提取记录项,将这个记录项派发给一个线程池线程,如果线程池没有线程,就创建一个新线程。
当线程池线程完成任务后,线程不会被销毁。相反,线程会返回线程池,在那里进入空闲状态,等待响应另一个请求。、
由于线程池不销毁自身,并且再次做异步操作不用创建新的线程,所以不再产生额外的性能损失。
如果应用程序向线程池发送了很多记录项到线程池队列,线程池最开始会尝试只用一个线程服务所有记录项,然而如果添加记录项的速度超过了线程池线程处理记录项的速度,就会创建额外的线程。
如果不再往线程池中发送请求,池中存在大量什么都不做的线程。那么这些闲置的线程池线程会在一段时间后自己醒来终止自己并释放资源。
写到这里就应该很清楚了,如果垃圾回收器是帮我们自动回收垃圾,那么线程池技术就是帮我们自动管理线程。
用线程池技术执行简单的计算限制操作
不多做解释,直接上代码更易懂,可对比上一章的用专用线程来进行计算限制的异步操作的代码,这样更易于理解:
static void Main(string[] args) { ThreadPool.QueueUserWorkItem(线程回调函数, "hello"); Console.WriteLine("记录项进入线程池队列"); Console.Read(); } private static void 线程回调函数(Object 状态参数) { Thread.Sleep(10000); if (状态参数.GetType() == typeof(string)) { Console.WriteLine("这是一个字符串"); } else { Console.WriteLine("未识别"); } }
执行上下文
每个线程都关联一个执行上下文数据结构。
执行上下文包括的东西有
安全设置(压缩站、Thread的Principal属性和Windows身份),
宿主设置(参见System.Threading.HostExecutionContextManager)
以及逻辑调用上下文数据(参见Sysyem.Runtime.Remoting.Messaging.CallContext的LogicalSetData和LogicalGetData方法)。
线程执行代码时,一些操作可能会用到执行上下文结构。
而每当一个线程使用另一个线程执行任务时,前者的执行上下文会复制给后者的执行上下文。这样就确保了执行任何操作都使用相同的安全设置和宿主设置。还确保了在线程中逻辑调用上下文中存储的任何数据都适用于被使用的另一个线程。
默认情况下,CLR自动造成初始线程的执行上下文复制到任何辅助线程。
这会造成性能影响,因为收集上下文信息并复制到辅助线程,花费不少时间,如果辅助线程中还有辅助线程,那么开销更大。
System.Threading命名空间有一个ExecutionContext类,也就是执行上下文类,它允许你控制线程的执行上下文是否复制到另一个线程。
常用的方法有三个,SuppressFlow(取消复制执行上下文),RestoreFlow(恢复复制执行上下文),IsFlowSuppressed(是否上下文复制被取消).
上代码,更简单:
static void Main(string[] args) { CallContext.LogicalSetData("操作", "将一个键值对放入执行上下文中"); ThreadPool.QueueUserWorkItem( state=>Console.WriteLine("第一次"+CallContext.LogicalGetData("操作")) ); ExecutionContext.SuppressFlow();//取消执行上下文在异步线程间的复制 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("第二次" + CallContext.LogicalGetData("操作")) ); ExecutionContext.RestoreFlow();//恢复执行上下文在异步线程间的复制 ThreadPool.QueueUserWorkItem( state => Console.WriteLine("第三次" + CallContext.LogicalGetData("操作")) ); Console.Read(); }
代码运行结果如下:
因为是异步操作,所以执行顺序不同,但是我们这里仅仅关注执行结果就行了,第二次确实没有将执行上下文复制到另一个线程中。
另外这里不仅仅是指线程池,专用线程也是一样的。
协作式取消和超时
.NET提供了标准的协作式取消操作模式,意味着要取消的操作必须显示支持取消。
也就是说无论执行操作的代码,还是取消操作的代码,都必须使用本节提到的类型。
取消操作首先要创建一个CancellationTokenSource对象。
这个对象包含了和管理取消有关的所有状态。可从此对象的Token属性获取一个或多个CancellationToken实例,并传给操作,使操作可以取消。
而CancellationToken是轻量级的值类型,包含单个私有字段即对其CancellationTokenSource对象的引用。
在计算限制操作的循环中,可以定时调用CancellationToken的IsCancellationRequested属性,了解循环是否应该提前终止,从而终止计算限制的操作。
以下为演示代码:
static void Main(string[] args) { var cts = new CancellationTokenSource(); ThreadPool.QueueUserWorkItem(state => Farm(cts.Token, 850));//Farm一个治疗指环 Console.WriteLine("按回车取消Farm"); Console.ReadLine(); cts.Cancel();// 取消Farm操作 Console.Read(); } //Farm指定数量的钱就返回 private static void Farm(CancellationToken token,int money) { var currentMoney = 0; while (currentMoney < money) { if (token.IsCancellationRequested) { Console.WriteLine("确定取消Farm"); break; } currentMoney += 50; Console.WriteLine("Troy已经Farm了" + currentMoney + "金"); Thread.Sleep(1000);//一秒钟补一个兵 } }
上效果图:
而如果要Farm操作不允许被取消,可以传CancellationToken.None。
可通过调用CancellationToken的Register方法登记一个或多个在取消操作时调用的函数。、
可通过CancellationTokenSource的CreateLinkedTokenSource函数链接其他CancellationTokenSource对象来创建一个新的对象A,如果任意一个被链接的对象取消,那么A也会被取消。
可通过传给CancellationTokenSource的构造器延时变量,表示在指定的一段时间后CancellationTokenSource自动取消。
任务
ThreadPool的QueueUserWorkItem方法发起一次异步的计算限制操作,然而并没有机制让我们知道什么时候这个操作完成,也没有机制在操作完成时获取返回值。
为了克服这个限制并解决其它一些问题,微软引入了任务的概念。(通过System.Threading.Task命名空间中的类型来使用任务)
以下代码为线程池玩法和任务玩法的对比
ThreadPool.QueueUserWorkItem(线程回调函数, "hello");//线程池玩法 new Task(线程回调函数, "hello").Start();//任务的玩法1 Task.Run(() => 线程回调函数("hello"));//任务的玩法2
在构造Task对象的时候,还可以传递CancellationToken,用于任务取消,也可以传递TaskCreationOptions标志来控制Task的执行方式。
接下来就写段代码,看看人物是如何等待任务完成并获取结果的
static void Main(string[] args) { Task<Tuple<Boolean, String>> myTask = new Task<Tuple<bool, string>>(赏金任务, 100); myTask.Start(); Thread.Sleep(10000); Console.WriteLine("任务进行中"); myTask.Wait();//显示等待任务结束 Console.WriteLine("任务结果为:" + myTask.Result.Item2); Console.ReadLine(); } private static Tuple<Boolean, String> 赏金任务(object state) { Console.WriteLine("Troy接手了这个赏金任务,并获取了{0}金",state.ToString()); return new Tuple<bool, string>(true, "成功"); }
Tuple<Boolean, string>为任务返回的结果类型,给Task的泛型变量就应该和所调用函数的返回值一样。
结果如下:
从这个结果我们了解到任务确实是异步执行了,并且确实返回了正确的结果给myTask.Result。
当调用Wait()函数时当前线程会阻塞,直到任务结束。(如果没用start,直接wait,那么任务也会执行。只不过此时线程不会被阻塞,它会直接执行任务并立即返回)
除了等待单个任务,实际上Task还提供了WaitAny和WaitAll两个静态方法来阻塞线程,等待一个Task数组,直到数组中的所有Task完成。
取消任务
前面讲到任务也可以传CancellationToken,用于取消任务。
在其它的地方一样,不过在判断任务是否取消的地方,应该用CancellationToken对象的ThrowIfCancellationRequested()方法而不是用IsCancellationRequested进行判断。
原因是不像线程池的QueueUserWorkItem,任务有办法表示完成,也可以返回一个值,所以需要采用一种方式将已完成的任务和出错的任务区分开。
而让任务抛出异常,就可以知道任务没有一直运行到结束。
上代码:
static void Main(string[] args) { var cts = new CancellationTokenSource(); Task<Tuple<Boolean, String>> myTask = Task.Run(()=>赏金任务(cts.Token,100), cts.Token ); Thread.Sleep(5000); cts.Cancel(); try { Console.WriteLine("任务结果为:" + myTask.Result.Item2); } catch (AggregateException ex) { //将任何OperationCanceledException对象都视为已处理 //其他任何异常都造成抛出一个新的AggregateException //其中只包含未处理的异常 ex.Handle(e => e is OperationCanceledException);//对异常集合的每个异常都调用处理程序 Console.WriteLine("取消任务"); } catch { Console.WriteLine("未知异常"); } Console.Read(); } private static Tuple<Boolean, String> 赏金任务(CancellationToken ct,object state) { for (int i = 0; i < 100; i++) { ct.ThrowIfCancellationRequested(); Console.WriteLine("Troy接手了这个赏金任务,并获取了{0}金", state.ToString()); Thread.Sleep(1000); } return new Tuple<Boolean, String>(true, "成功"); }
上结果:
任务完成时自动启动新任务
伸缩性好的软件不应该使用线程阻塞。
调用Wait或者在任务尚未完成时查询任务的Result属性,极有可能造成线程池创建新的线程。
以下方法可以知道任务在什么时候结束,且不使用阻塞。
Task<Tuple<Boolean, String>> myTask = Task.Run(()=>赏金任务(cts.Token,100), cts.Token );//创建并启动一个任务 Task myTask1 = myTask.ContinueWith(task => Console.WriteLine("任务结果为:" + task.Result.Item2));
且还可以传给它TaskContinuationOptions位标志来控制继续的任务。默认情况下,不指定任何TaskContinuationOptions位标志,那么无论第一个任务取消还是失败,都会继续执行第二个任务。
任务调用的函数中创建的任务,被称为子任务,有一些关于父任务和子任务的处理,用TaskContinuationOptions或之前介绍的TaskCreationOptions来控制。
实际上Task对象内部有一个ContinueWith任务的集合,也就是说一个Task可以多次ContinueWith,这个Task在任务完成后会执行所有的ContinueWith任务。
任务的内部揭秘
每个Task对象内部都有一组字段,这些字段构成了任务的状态。
任务虽然很有用,但是它也是有代价的。必须为所有这些状态分配内存,如果不需要任务的附加功能(也就是知道何时结束且可以返回值),那么使用ThreadPool的QueueUserWorkItem能获得更好的资源利用率。
Task对象的只读属性Status返回一个TaskStatus枚举值,该枚举值表明了任务正处于一个怎样的状态。
当任务创建后,状态为Created,启动后为WatingToRun,实际在一个线程中运行后为Running,停止运行并等待任何子任务时为WaitingForChildrenToComplete。
完成时进入一下状态:RanToCompletion(完成),Canceled(取消),Faulted(出错)。
如果任务出错,可查询任务的Exception属性获取任务抛出的未处理异常。其总是返回一个AggregateException对象,其InnerExceptions集合包含了所有未处理异常。
调用ContinueWith,ContinueWhenAll,ContinueWhenAny或FromAsync等方法创建的Task对象处于WatingForActivation状态。该状态表示任务隐式创建,并会自动开始。
任务工厂
有时需要创建一组共享相同配置的Task对象。为避免机械地将相同的参数传递给每个Task的构造器,可创建一个任务工厂来封装通用配置。
而TaskFactory类型就是这个目的。
创工厂类时,要向构造器传递所有要创建的任务都具有的默认值,也就是CancellationToken,TaskScheduler,TaskCreationOption和TaskContinuationOptions。
来个简单演示:
var tf = new TaskFactory<Int32>( cts.Token, TaskCreationOptions.AttachedToParent, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default ); //用任务工厂创建三个任务 var childTasks = new[] { tf.StartNew(()=> { Console.WriteLine("任务1");return 1; }), tf.StartNew(()=> { Console.WriteLine("任务2");return 2; }), tf.StartNew(()=> { Console.WriteLine("任务3");return 3; }) }; tf.ContinueWhenAll(childTasks, completedTask => completedTask .Where(t => !t.IsFaulted && !t.IsCanceled).Max(t => t.Result), CancellationToken.None) .ContinueWith(t => Console.WriteLine("最后的任务返回结果为" + t.Result), TaskContinuationOptions.ExecuteSynchronously); Console.Read();
这只是最基础的用法,取消的时候只需要传给他一个Token,那么一旦取消,整个task数组中的任务都会取消。
任务调度器
任务基础结构非常灵活,其中TaskScheduler对象功不可没。
此对象负责执行被调度的任务。FCL提供了两个派生自TaskScheduler的类型:线程池任务调度器和同步上下文调度器。默认情况下所有应用程序使用的都是线程池任务调度器。
同步上下文任务调度器适合提供了图形用户界面的应用程序。它将所有任务都调度给程序的GUI线程,使所有任务代码都能成功更新UI组件。该调度不使用线程池。
可执行TaskScheduler的静态FromCurrentSynchronizationContext()方法来获取对同步上下文任务调度器的引用。
这个玩法貌似我用不到,做web的啊,而且看了例子很简单,所以这里就不写了。
Parallel的静态For,ForEach和Invoke方法
Parallel就是并行的意思。
主要是用于将一些常见的for或者foreach循环用任务进行多线程化,以提升性能。
System.Threading.Tasks.Parallel类封装了这些情形,例如下面的代码:
for (int i = 0; i < 1000; i++) DoSomething(i);//for循环做某事 Parallel.For(0, 1000, i => DoSomething(i));//Parallel替代方案,线程池并行处理工作 foreach (var item in collection) DoSomething(item);//foreach循环做某事 Parallel.ForEach(collection, l => DoSomething(l));//Parallel替代方案,线程池并行处理工作 //如果可以用For而不是ForEach,那么就用For,因为更快 //顺序执行所有方法 Method1(); Method2(); Method3(); //Parallel替代方案,顺序执行所有方法 Parallel.Invoke(() => Method1(), () => Method2(), () => Method3());
如果调用线程在线程池执行完任务之前执行完了自己的那部分工作,那么调用线程会挂起,等待任务完成。
然而调用Parallel的方法时,请注意所做的工作一定要能并行执行,如果必须要顺序执行,那么还是用原来的for循环比较好。
如果有大量的工作项(也就是循环次数很多),或者是每次循环做的事情涉及大量工作,那么用Parallel性能会得到很大提升,反之,性能可能得不偿失。
Parallel的方法都可以接收一个ParallelOptions对象,这个对象可以对Parallel的工作方式做一些配置。
还可以传递一个ParallelloopState对象来控制循环任务的执行。
此对象的Stop方法,让循环停止,Break让循环不再处理后面的工作。
并行语言集成查询(PLINQ)
LINQ提供了一简捷的语法来查询数据集合。然而其只能一个线程顺序处理数据集合中的所有项。这就是顺序查询。
而要提高性能,可以使用PLINQ,也就是并行LINQ。它将顺序查询转换为并行查询。
静态System.Linq.ParallelEnumerable类(在System.Core.dll中定义)实现了PLINQ的所有功能,所以必须通过C#的using指令将System.Linq命名空间导入源代码。
而所有的Where,Select之类方法的并行版本,都是System.Linq.ParallelQuery<T>类型的扩展方法。
下面举个简单的例子:
List<string> nameList = new List<string>() { "Troy", "小二", "小三", "小四" }; var query = from name in nameList.AsParallel()//启用查询的并行化,将其转换为ParallelQuery<string> let myName = "我叫" + name where name == "Troy" select myName; Parallel.ForEach(query, l => Console.WriteLine(l)); //query.ForAll(l => Console.WriteLine(l));//也可以用这行代码替代上一句,ParallelQuery有个ForAll方法,为每个查询的结果执行内容 Console.Read();
以上例子只是为了演示玩法,并不考虑效率。
通过上面的例子其实可以发现,PLINQ和LINQ没有任何区别,只要将集合调用AsParallel()即可。
而如果要讲并行查询再转换为并行查询,那么可以用AsSequential()。
上面的这个例子用顺序查询实际上快得多。而且Console内部会对线程同步,确保每次只有一个线程来访问控制台窗口,所以这里用并行操作实际上还会损坏性能。
用PLINQ因为是并行处理数据,所以返回的都是无序结构,如果要保持顺序,那么应该调用AsOrdered方法,调用后会成组处理数据,然后组合并后保持顺序,可以想象这也会损耗性能。
而且以下操作符也会声称不排序的操作:Distinct,Except,Intersect,Union,Join,GroupBy,GroupJoin和ToLookup.如果这些操作后还要排序,那么又要调用AsOrdered方法。
同事PLINQ提供了一些额外的方法:
WithCancellation(允许取消),
WithDegreeOfParallelism(指定最多的线程数),
WithExecutionMode(传递ParallelExecutionMode标志),
WithMergeOptions(PLINQ让多个线程处理数据后会合并,所以可传参ParallelMergeOptions位标志,控制结果的缓冲和合并方式。有缓冲倾向于加快速度,无缓冲倾向于节约内存)。
执行定时计算限制操作
System.Threading命名空间有一个Timer类,可执行定时操作。
在内部,线程池为所有Timer对象都只使用一个线程,此线程知道下一个Timer对象在什么时候到期。到期后,线程会被唤醒,在内部调用线程池的QueueUserWorkItem,将工作项加入线程池队列。
这个点就不多讲了,很常见。(垃圾回收那一章讲过如果Timer对象在代码上看起来没被使用会导致被回收,所以要有变量保持Timer对象存活)
如果要定时执行某操作,可以使用Task的静态Delay方法和C#的async和await关键字。(下一章会讲到,这里只给一个简单例子)
static void Main(string[] args) { asyncDoSomething(); Console.Read(); } private static async void asyncDoSomething() { while (true) { Console.WriteLine("time is {0}", DateTime.Now); //不阻塞线程的前提下延迟两秒 await Task.Delay(2000);//await允许线程返回 //2秒后某个线程会在await后介入并继续循环 } }
线程池如何管理线程
CLR允许开发人员设置线程池要创建的最大线程数。(然而如果设定了这个值,那么就可能发生饥饿和死锁)。
默认的最大线程数目前为1000个左右。
可通过Threadpool类的一些静态方法如GetMaxThreads,SetMinThreads来限制线程池的线程数,然而作者并不建议这么做。
Threadpool.QueueUserWorkItem方法和Timer类总是将工作项放在一个线程池全局队列中的(用的先入先出模式),所以多个工作者线程可能同时从这个队列中取工作项。为了保证多个工作者线程不会取到一个工作项,所以实际上所有工作者线程都竞争同一个线程同步锁。
而对于任务,非工作者线程调用一个任务时(用非默认的TaskScheduler任务调度器),任务被放进全局队列。
而工作者线程调度Task时,都有自己的本地队列。工作者线程准备处理工作项时,先检查本地队列,由于工作者线程是唯一允许访问自身的本地队列,所以这里不需要线程同步锁。(所以在本地队列删除和增加Task的速度很快,本地队列的处理用的是后入先出模式)
如果某个工作者线程的本地队列空了,那么它会从其它队列找工作项去执行,并要求获取一个线程同步锁。
如果所有工作者线程的本地队列都空了,那么这个时候才检查全局队列。
如果全局队列也空了,那么工作者线程会进入睡眠,等待事情发生。
如果睡眠事件太长,会自动唤醒,并销毁自身。