最近因为比较忙,好久没有写博客了,这篇主要给大家分享一下PLINQ中的分区。上一篇介绍了并行编程,这边详细介绍一下并行编程中的分区和自定义分区。
先做个假设,假设我们有一个200Mb的文本文件需要读取,怎么样才能做到最优的速度呢?对,很显然就是拆分,把文本文件拆分成很多个小文件,充分利用我们计算机中的多核cpu的优势,让每个cpu都充分的利用,达到效率的最大化。然而在PLINQ中也是,我们有一个数据源,如果想进行最大的并行化操作,那么就需要把其拆分为可以多个线程同时访问的多个部分,这就是PLINQ中的分区。当然,微软已经为我们想到了这一点,知道他的用户可能会有这个需求,所以就先说一下微软给我们提供的默认的一个分区程序吧。
微软提供的默认的分区程序又叫做任务并行库(TPL),其实就是当你用PLINQ的ForEach的时候,默认在其内部就会给我们进行分区。怎么样,是不是很方便。不过有时候,你可能会需要自己来进行拆分,那么就是另外一种跟高级一点的用法了,就是PLINQ的自定义分区。自定义分区有两种,一种是按照范围分区,另一种是按照区块分区。其中按照范围分区在针对链表集合能够提供非常好的性能,比如IList等,不过它也有一点缺点那就是如果一个线程提前完成,它将无法帮助其他线程完成它们的工作。按照区块分区是当我们不知道我们所要操作的集合的大小的时候,可以使用按照区块分区,在按区块分区中,并行循环或查询中的每个线程或任务都使用一个区块中一定数量的源元素,对它们进行处理,然后返回检索其他元素。分区程序可确保分发所有元素,并且没有重复项。区块可为任意大小。
通常,只有当委托的执行时间为较短到中等程度,源具有大量的元素,并且每个分区的总工作量大致相等时,按范围分区的速度才会较快。因此,按区块分区的速度在大多数情况下较快。对于元素数量很少或委托执行时间较长的源,则按区块分区和按范围分区的性能大致相等。
那么我们如何实现动态分区呢?下面有一个摘自MSDN的示例。
每次分区对枚举器调用 MoveNext 时,枚举器都会提供包含一个列表元素的分区。对于 PLINQ 和 ForEach,分区是一个 Task 实例。由于请求同时在多个线程上发生,因此对当前索引的访问是同步的。
1 // 2 // An orderable dynamic partitioner for lists 3 // 4 class OrderableListPartitioner<TSource> : OrderablePartitioner<TSource> 5 { 6 private readonly IList<TSource> m_input; 7 8 public OrderableListPartitioner(IList<TSource> input) 9 : base(true, false, true) 10 { 11 m_input = input; 12 } 13 14 // Must override to return true. 15 public override bool SupportsDynamicPartitions 16 { 17 get 18 { 19 return true; 20 } 21 } 22 23 public override IList<IEnumerator<KeyValuePair<long, TSource>>> 24 GetOrderablePartitions(int partitionCount) 25 { 26 var dynamicPartitions = GetOrderableDynamicPartitions(); 27 var partitions = 28 new IEnumerator<KeyValuePair<long, TSource>>[partitionCount]; 29 30 for (int i = 0; i < partitionCount; i++) 31 { 32 partitions[i] = dynamicPartitions.GetEnumerator(); 33 } 34 return partitions; 35 } 36 37 public override IEnumerable<KeyValuePair<long, TSource>> 38 GetOrderableDynamicPartitions() 39 { 40 return new ListDynamicPartitions(m_input); 41 } 42 43 private class ListDynamicPartitions 44 : IEnumerable<KeyValuePair<long, TSource>> 45 { 46 private IList<TSource> m_input; 47 private int m_pos = 0; 48 49 internal ListDynamicPartitions(IList<TSource> input) 50 { 51 m_input = input; 52 } 53 54 public IEnumerator<KeyValuePair<long, TSource>> GetEnumerator() 55 { 56 while (true) 57 { 58 // Each task gets the next item in the list. The index is 59 // incremented in a thread-safe manner to avoid races. 60 int elemIndex = Interlocked.Increment(ref m_pos) - 1; 61 62 if (elemIndex >= m_input.Count) 63 { 64 yield break; 65 } 66 67 yield return new KeyValuePair<long, TSource>( 68 elemIndex, m_input[elemIndex]); 69 } 70 } 71 72 IEnumerator IEnumerable.GetEnumerator() 73 { 74 return 75 ((IEnumerable<KeyValuePair<long, TSource>>)this) 76 .GetEnumerator(); 77 } 78 } 79 } 80 81 class ConsumerClass 82 { 83 static void Main() 84 { 85 var nums = Enumerable.Range(0, 10000).ToArray(); 86 OrderableListPartitioner<int> partitioner = new OrderableListPartitioner<int>(nums); 87 88 // Use with Parallel.ForEach 89 Parallel.ForEach(partitioner, (i) => Console.WriteLine(i)); 90 91 92 // Use with PLINQ 93 var query = from num in partitioner.AsParallel() 94 where num % 2 == 0 95 select num; 96 97 foreach (var v in query) 98 Console.WriteLine(v); 99 } 100 }
这是按区块分区的示例,其中每个区块都由一个元素组成。通过一次提供多个元素,您可以减少锁争用,并在理论上实现更快的性能。但是,有时较大的区块可能需要额外的负载平衡逻辑才能使所有线程在工作完成之前保持忙碌。