.NET 4.0中的Task Parallel Library(TPL)已经不是什么新鲜事了,相信很多朋友也阅读过不少有关TPL的书籍资料。而另一方面,能够将TPL合理地运用在实际项目开发过程中,以提高程序的执行效率,这种情况也并不多见。本文就以实际项目中的一个程序功能为例,简要讨论一下TPL的应用。在此我不打算对TPL的相关基础知识做过多讨论,这些内容在网上应该有不少的文章资料可供参考;同时读者朋友还可以阅读一些有关TPL的经典书籍,以便加深对TPL的理解。文章最后我会推荐几本不错的有关.NET 4.0下TPL的书籍资料。
案例:批量对象的XML序列化
在某个项目中,需要对一大批相同类型的对象进行XML序列化操作,在序列化工作完成后,程序会把序列化所得的XML字符串根据对象的ID值保存到一个字典(Dictionary)的对象中,以便后续的程序逻辑能够使用这些序列化后的XML。为了简化起见,我定义了一个Customer类来模拟这些对象的类型(实际项目中的对象类型要比这个Customer复杂一些),这个Customer类仅包含两个属性:ID和Name。下图大致描述了这个处理过程:
现在让我们先定义这个Customer类,以便为接下来的实验作准备。Customer类的定义如下:
public class Customer { public long ID { get; set; } public string Name { get; set; } public override string ToString() { return Name; } }
下面,我们分别使用传统的方式和基于TPL的并行处理方式来实现这个程序,然后比较一下这两种方式产生的效果差异。
传统的实现方式
传统的实现方式很简单,基本思路就是对每一个Customer对象,使用XmlSerializer对其进行序列化操作,然后把产生的XML字符串保存到字典中。代码如下:
static IEnumerable<KeyValuePair<long, string>> SerializeCustomers(Customer[] customers) { var dict = new Dictionary<long, string>(); var xmlSerializer = new XmlSerializer(typeof(Customer)); foreach (var customer in customers) { using (var ms = new MemoryStream()) { xmlSerializer.Serialize(ms, customer); dict.Add(customer.ID, Encoding.ASCII.GetString(ms.ToArray())); } } return dict; }
基于TPL的并行处理方式
在采用这种方式之前,需要对我们的应用场景进行分析。今后在项目中打算使用TPL之前,都应该进行这样的分析。主要目的就是为了讨论目前我们所面对的场景,是否可以使用并行计算。目前我们的应用场景是可以采用TPL的并行处理方式的。因为首先,针对每个Customer对象的序列化操作都相对独立,没有先后顺序之分,即各操作之间是可替换的,比如计算a+b+c,可以先计算a+b(也就是(a+b)+c),也可以先计算b+c(也就是a+(b+c));其次,虽然在最后整合结果的时候需要访问跨线程的共享资源,也就是在最后整合结果的时候产生了资源的依赖关系,但对于整个计算的过程,各个任务都是可以互不干扰地执行的。在运用TPL的时候,我觉得应该尽可能地降低各个任务之间的依赖关系,因为TPL中的任务有可能会被分配到不同的线程去执行,如果任务之间有资源的相互依赖的话,线程同步将降低任务执行的效率。
以下是此案例的TPL版本:
static IEnumerable<KeyValuePair<long, string>> ParallelSerializeCustomers(Customer[] customers) { var dict = new Dictionary<long, string>(); var xmlSerializer = new XmlSerializer(typeof(Customer)); object lockObj = new object(); Parallel.ForEach(customers, () => new Dictionary<long, string>(), (customer, loopState, single) => { using (var ms = new MemoryStream()) { xmlSerializer.Serialize(ms, customer); single.Add(customer.ID, Encoding.ASCII.GetString(ms.ToArray())); } return single; }, (single) => { lock (lockObj) { single.ToList().ForEach(p => dict.Add(p.Key, p.Value)); } }); return dict; }
在ParallelSerializeCustomers方法中,采用了foreach循环的并行版本:Parallel.ForEach方法。这个方法与foreach类似,会逐个轮询给定的IEnumerable对象中的没一个值,不过Parallel.ForEach方法会将这个轮询的过程分配到多个Task上执行,因此对于Parallel.ForEach,执行过程的中断(break)以及异常处理都与foreach完全不同。在这个例子中,我们使用的是Parallel.ForEach方法的其中一个重载版本,在这个方法重载中,首先我们将需要轮询的IEnumerable对象(也就是这里的customers数组)传递给该方法;之后有一个Func<TLocal>的委托参数,这个委托参数的作用是为了对Task执行线程范围内的局部变量进行初始化,在这里我们直接使用Lambda表达式返回了一个新建的Dictionary<long, string>对象,表示需要对线程范围内的局部变量(其实就是第三个参数中的那个single变量)初始化成一个新的Dictionary<long, string>实例;第三个参数也是一个委托,用于对当前的枚举对象执行真正的处理逻辑,然后将处理结果返回;第四个参数则是用来整合每个任务的处理结果,以得到最终结果。不难看出,在整合最终结果的时候,多个线程需要同时访问dict变量,因此需要使用lock关键字以保证线程同步。
执行效果对比
以下是在一台具有4核CPU的计算机上,处理十万(100000)个Customer对象的执行效果,可见基于TPL的实现效率要比传统的实现方式高很多。值得一提的是,传统方式所产生的dict是有序的,而基于TPL的方式所产生的dict则是无序的,但这并不影响结果,因为程序并不会关心dict中的值是否有序。
以下是传统实现方式下,CPU的利用率。我们可以看到,基本上CPU的利用率只能达到20%-30%左右,大部分CPU资源都没有利用到:
以下是基于TPL方式下,CPU的利用率,基本上能达到85%以上(估计剩下的部分由于IO的原因,所以没有达到更高的CPU利用率):
参考书籍
- 《Parallel Programming with Microsoft Visual Studio 2010 Step by Step》
- 《Professional Parallel Programming with C#: Master Parallel Extensions with .NET 4》
- 《Parallel Programming with Microsoft .NET: Design Patterns for Decomposition and Coordination on Multicore Architectures》