zoukankan      html  css  js  c++  java
  • C#中的PLINQ(并行LINQ)

    .NET Framework 3.5 中引入了语言集成查询 (LINQ),它具有统一的模型,以类型安全方式查询任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable<T> 数据源。关于LINQ函数的使用,可以参考:https://www.cnblogs.com/zhaotianff/p/6236062.html

    并行 LINQ (PLINQ) 是 LINQ 模式的并行实现

    注意:以下示例代码仅是演示用,其运行速度可能不如等效的顺序LINQ查询快

    1、如何进行PLINQ查询

    ParallelEnumerable类提供了一个扩展方法AsParallel()可以并行执行LINQ查询

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace PLINQ_demo
     8 {
     9     class Program
    10     {
    11         static void Main(string[] args)
    12         {
    13             var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 };
    14 
    15             var result = list.AsParallel().Where(x => x > 30);
    16         }
    17     }
    18 }

    2、PLINQ查询的参数

    WithDegreeOfParallelism:指定 PLINQ 应用于并行化查询的处理器的最大数量。

    WithExecutionMode:指定 PLINQ 应如何并行化查询(即使是当默认行为是按顺序运行查询时)。

    WithMergeOptions:提供有关 PLINQ 应如何(如果可能)将并行结果合并回使用线程上的一个序列的提示。

    ParallelMergeOptions 枚举值如下:

    AutoBuffered 2

    利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。

    Default 0

    使用默认合并类型,即 AutoBuffered。

    FullyBuffered 3

    利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。

    NotBuffered 1

    不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。

    WithCancellation:指定 PLINQ 应定期监视请求取消时所提供的取消标记的状态以及取消执行。(这个操作和线程的取消操作是一致的,如果不理解线程中的取消操作,可以访问:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace PLINQ_demo
     8 {
     9     class Program
    10     {
    11         static void Main(string[] args)
    12         {           
    13             var cts = new System.Threading.CancellationTokenSource();
    14 
    15             try
    16             {
    17                 var result2 = list.AsParallel().Where(x => x > 30)
    18                 .WithDegreeOfParallelism(Environment.ProcessorCount)
    19                 .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    20                 .WithMergeOptions(ParallelMergeOptions.Default)
    21                 .WithCancellation(cts.Token);
    22 
    23                 PrintResult(result2);
    24             }
    25             catch(OperationCanceledException)
    26             {
    27                 Console.WriteLine("并行查询已被取消");
    28             }
    29         }
    30 
    31 
    32         static void PrintResult(IEnumerable<int> collection)
    33         {
    34             foreach (var item in collection)
    35             {
    36                 Console.WriteLine(item);
    37             }
    38         }
    39     }
    40 }

    3、PLINQ查询中常用的函数

    AsOrdered:指定 PLINQ 应为查询的其余部分保留源序列的排序,或直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。

    AsUnordered:指定保留源序列的排序不需要查询其余部分的 PLINQ。

    AsSequential:指定查询的其余部分应像非并行的 LINQ 查询一样按顺序运行。

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace PLINQ_demo
     8 {
     9     class Program
    10     {
    11         static Func<int, bool> query => x => x > 30;
    12 static void Main(string[] args) 13 { 14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc 15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc 16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc 17 PrintResult(orderResult); 18 19 var unorderResult = list.AsParallel().AsUnordered().Where(query); 20 PrintResult(unorderResult); 21 22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 23 Console.WriteLine(" "); 24 25 var linqResult = list.Where(query); 26 PrintResult(linqResult); 27 28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query); 29 PrintResult(pinqSequentialResult); 30 31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query); 32 PrintResult(forceExecutionMode); 33 } 34 35 36 static void PrintResult(IEnumerable<int> collection) 37 { 38 foreach (var item in collection) 39 { 40 Console.WriteLine(item); 41 } 42 43 Console.WriteLine(" "); 44 } 45 } 46 }

    4、ForAll

    ForAll是一种多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回使用者线程的情况下并行处理结果。

    如果并行执行查询,PLINQ 对源序列进行分区,以便多个线程能够并发处理不同部分,通常是在不同的线程中。 如果要在一个线程(例如,foreach)中使用结果,必须将每个线程的结果合并回一个序列中。 PLINQ 执行的合并类型具体视查询中的运算符而定。 例如,对结果强制施加新顺序的运算符必须缓冲所有线程中的全部元素。 从使用线程(以及应用用户)的角度来看,完全缓冲查询可能会运行很长时间,才能生成第一个结果。 默认情况下,其他运算符进行部分缓冲,并分批生成结果。 默认不缓冲的一个运算符是 ForAll。 它会立即生成所有线程中的所有元素。

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace PLINQ_demo
     8 {
     9     class Program
    10     {
    11         static Func<int, bool> query => x => x > 30;
    12         static void Main(string[] args)
    13         {
    14             var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 };
    15             list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x));
    16             Console.WriteLine("
    
    ");        
    17         }
    18     }
    19 }

    5、处理PLINQ查询中常用的异常

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace PLINQ_demo
     8 {
     9     class Program
    10     {
    11         static void Main(string[] args)
    12         {
    13             //普通 LINQ
    14             try
    15             {
    16                 tempList.Select(x => 100 / x).ToList();
    17             }
    18             catch(DivideByZeroException ex)
    19             {
    20                 Console.WriteLine(ex.Message);
    21             }
    22 
    23             //PLINQ
    24             try
    25             {
    26                  tempList.AsParallel().Select(x => 100 / x).ToList();              
    27             }
    28             catch(DivideByZeroException ex)
    29             {
    30                 Console.WriteLine(ex.Message);
    31             }
    32             catch(AggregateException ex)
    33             {
    34                 Console.WriteLine(ex.Message);
    35             }
    36 
    37         }
    38     }
    39 }

    使用顺序LINQ查询:当除以0时,得到了DevideByZeroException异常 

    使用并行LINQ查询:当使用Asparallel()运行,除以0时,得到了AggregateException,因为现在是并行的方式运行,AggregateException将包含运行PLINQ查询期间的所有异常,可以使用Flatten和Handle方法来处理内部的DivideByZeroException

    示例如下:

     1 try
     2             {
     3                 tempList.AsParallel().Select(x => 100 / x).ToList();              
     4             }
     5             catch(DivideByZeroException ex)
     6             {
     7                 Console.WriteLine(ex.Message);
     8             }
     9             catch(AggregateException ex)
    10             {
    11                 var exceptions = ex.Flatten().InnerExceptions;
    12 
    13                 foreach (var item in exceptions)
    14                 {
    15                     Console.WriteLine(item);
    16                 }
    17             }

     6、数据分区

    若要并行执行对数据源的操作,关键步骤之一是,将数据源分区 成多个部分,以供多个线程同时访问。 PLINQ 和任务并行库 (TPL) 提供了默认分区程序,在用户编写并行查询或 ForEach 循环时透明运行。 对于更高级的方案,可以插入自己的分区程序。

    这里有点难,还需要再学习理解一下,后面再更新

  • 相关阅读:
    启动窗体的程序控制与动画效果
    在线程中使用定时器
    从oracle9i/92数据库中导出数据至 oracle 8.1.7 数据库中
    收集:PowerDesigner常见问题解决与设置集锦
    [转]C# 2.0新特性与C# 3.5新特性
    COM服务器的创建过程
    [原创] 为什么需要TLS(Thread Local Storage)?
    COM+服务器的.Net组件实现 客户端
    如何在客户端避免繁冗的服务器GUID定义及导入?
    进程、线程、套间和环境
  • 原文地址:https://www.cnblogs.com/zhaotianff/p/12658004.html
Copyright © 2011-2022 走看看