.NET Framework 3.5 中引入了语言集成查询 (LINQ),它具有统一的模型,以类型安全方式查询任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable<T> 数据源。关于LINQ函数的使用,可以参考:https://www.cnblogs.com/zhaotianff/p/6236062.html
并行 LINQ (PLINQ) 是 LINQ 模式的并行实现
注意:以下示例代码仅是演示用,其运行速度可能不如等效的顺序LINQ查询快
1、如何进行PLINQ查询
ParallelEnumerable类提供了一个扩展方法AsParallel()可以并行执行LINQ查询
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 14 15 var result = list.AsParallel().Where(x => x > 30); 16 } 17 } 18 }
2、PLINQ查询的参数
WithDegreeOfParallelism:指定 PLINQ 应用于并行化查询的处理器的最大数量。
WithExecutionMode:指定 PLINQ 应如何并行化查询(即使是当默认行为是按顺序运行查询时)。
WithMergeOptions:提供有关 PLINQ 应如何(如果可能)将并行结果合并回使用线程上的一个序列的提示。
ParallelMergeOptions 枚举值如下:
AutoBuffered | 2 |
利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。 |
Default | 0 |
使用默认合并类型,即 AutoBuffered。 |
FullyBuffered | 3 |
利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。 |
NotBuffered | 1 |
不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。 |
WithCancellation:指定 PLINQ 应定期监视请求取消时所提供的取消标记的状态以及取消执行。(这个操作和线程的取消操作是一致的,如果不理解线程中的取消操作,可以访问:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads)
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static void Main(string[] args) 12 { 13 var cts = new System.Threading.CancellationTokenSource(); 14 15 try 16 { 17 var result2 = list.AsParallel().Where(x => x > 30) 18 .WithDegreeOfParallelism(Environment.ProcessorCount) 19 .WithExecutionMode(ParallelExecutionMode.ForceParallelism) 20 .WithMergeOptions(ParallelMergeOptions.Default) 21 .WithCancellation(cts.Token); 22 23 PrintResult(result2); 24 } 25 catch(OperationCanceledException) 26 { 27 Console.WriteLine("并行查询已被取消"); 28 } 29 } 30 31 32 static void PrintResult(IEnumerable<int> collection) 33 { 34 foreach (var item in collection) 35 { 36 Console.WriteLine(item); 37 } 38 } 39 } 40 }
3、PLINQ查询中常用的函数
AsOrdered:指定 PLINQ 应为查询的其余部分保留源序列的排序,或直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。
AsUnordered:指定保留源序列的排序不需要查询其余部分的 PLINQ。
AsSequential:指定查询的其余部分应像非并行的 LINQ 查询一样按顺序运行。
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace PLINQ_demo
8 {
9 class Program
10 {
11 static Func<int, bool> query => x => x > 30;
12 static void Main(string[] args)
13 {
14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc
15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc
16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc
17 PrintResult(orderResult);
18
19 var unorderResult = list.AsParallel().AsUnordered().Where(query);
20 PrintResult(unorderResult);
21
22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x));
23 Console.WriteLine("
");
24
25 var linqResult = list.Where(query);
26 PrintResult(linqResult);
27
28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query);
29 PrintResult(pinqSequentialResult);
30
31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query);
32 PrintResult(forceExecutionMode);
33 }
34
35
36 static void PrintResult(IEnumerable<int> collection)
37 {
38 foreach (var item in collection)
39 {
40 Console.WriteLine(item);
41 }
42
43 Console.WriteLine("
");
44 }
45 }
46 }
4、ForAll
ForAll是一种多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回使用者线程的情况下并行处理结果。
如果并行执行查询,PLINQ 对源序列进行分区,以便多个线程能够并发处理不同部分,通常是在不同的线程中。 如果要在一个线程(例如,foreach)中使用结果,必须将每个线程的结果合并回一个序列中。 PLINQ 执行的合并类型具体视查询中的运算符而定。 例如,对结果强制施加新顺序的运算符必须缓冲所有线程中的全部元素。 从使用线程(以及应用用户)的角度来看,完全缓冲查询可能会运行很长时间,才能生成第一个结果。 默认情况下,其他运算符进行部分缓冲,并分批生成结果。 默认不缓冲的一个运算符是 ForAll。 它会立即生成所有线程中的所有元素。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 namespace PLINQ_demo 8 { 9 class Program 10 { 11 static Func<int, bool> query => x => x > 30; 12 static void Main(string[] args) 13 { 14 var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 }; 15 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 16 Console.WriteLine(" "); 17 } 18 } 19 }
5、处理PLINQ查询中常用的异常
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading.Tasks;
6
7 namespace PLINQ_demo
8 {
9 class Program
10 {
11 static void Main(string[] args)
12 {
13 //普通 LINQ
14 try
15 {
16 tempList.Select(x => 100 / x).ToList();
17 }
18 catch(DivideByZeroException ex)
19 {
20 Console.WriteLine(ex.Message);
21 }
22
23 //PLINQ
24 try
25 {
26 tempList.AsParallel().Select(x => 100 / x).ToList();
27 }
28 catch(DivideByZeroException ex)
29 {
30 Console.WriteLine(ex.Message);
31 }
32 catch(AggregateException ex)
33 {
34 Console.WriteLine(ex.Message);
35 }
36
37 }
38 }
39 }
使用顺序LINQ查询:当除以0时,得到了DevideByZeroException异常
使用并行LINQ查询:当使用Asparallel()运行,除以0时,得到了AggregateException,因为现在是并行的方式运行,AggregateException将包含运行PLINQ查询期间的所有异常,可以使用Flatten和Handle方法来处理内部的DivideByZeroException类
示例如下:
1 try 2 { 3 tempList.AsParallel().Select(x => 100 / x).ToList(); 4 } 5 catch(DivideByZeroException ex) 6 { 7 Console.WriteLine(ex.Message); 8 } 9 catch(AggregateException ex) 10 { 11 var exceptions = ex.Flatten().InnerExceptions; 12 13 foreach (var item in exceptions) 14 { 15 Console.WriteLine(item); 16 } 17 }
6、数据分区
若要并行执行对数据源的操作,关键步骤之一是,将数据源分区 成多个部分,以供多个线程同时访问。 PLINQ 和任务并行库 (TPL) 提供了默认分区程序,在用户编写并行查询或 ForEach 循环时透明运行。 对于更高级的方案,可以插入自己的分区程序。
这里有点难,还需要再学习理解一下,后面再更新