zoukankan      html  css  js  c++  java
  • .Net多线程编程—Parallel LINQ、线程池

    Parallel LINQ

    1 System.Linq.ParallelEnumerable

    重要方法概览:

    1)public static ParallelQuery<TSource> AsParallel<TSource>(this IEnumerable<TSource> source);启用查询的并行化

    2)public static ParallelQuery<TSource> AsOrdered<TSource>(this ParallelQuery<TSource> source);启用将数据源视为“已经排序”的处理方法,重写默认的将数据源视为“未经排序”的处理方法。只可以对由 AsParallel、ParallelEnumerable.Range和 ParallelEnumerable.Repeat 返回的泛型序列调用 AsOrdered。

    3) public static ParallelQuery<TSource> WithExecutionMode<TSource>(this ParallelQuery<TSource> source, ParallelExecutionMode executionMode);设置查询的执行模式

    4)public static double Average(this ParallelQuery<double> source);计算序列平均值

    5) public static decimal? Max(this ParallelQuery<decimal?> source);计算序列最大值

    6) public static decimal? Min(this ParallelQuery<decimal?> source);计算序列中最小值

    7)public static decimal? Sum(this ParallelQuery<decimal?> source);求和

    8)public static TResult Aggregate<TSource, TAccumulate, TResult>(this ParallelQuery<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> func, Func<TAccumulate, TResult> resultSelector);对一个序列并行应用累加器函数。 将指定的种子值用作累加器的初始值,并使用指定的函数选择结果值。

    9)public static ParallelQuery<TSource> WithCancellation<TSource>(this ParallelQuery<TSource> source, CancellationToken cancellationToken);设置要与查询关联的 System.Threading.CancellationToken。

    10)public static ParallelQuery<TSource> WithDegreeOfParallelism<TSource>(this ParallelQuery<TSource> source, int degreeOfParallelism);设置要在查询中使用的并行度。

    11)public static void ForAll<TSource>(this ParallelQuery<TSource> source, Action<TSource> action);对 source 中的每个元素并行调用指定的操作。

    12)public static ParallelQuery<TSource> WithMergeOptions<TSource>(this ParallelQuery<TSource> source, ParallelMergeOptions mergeOptions);设置此查询的合并选项,它指定查询对输出进行缓冲处理的方式。

    说明:

    1)PLINQ实现了全部的LINQ操作符,并添加了部分并行操作符。

    2)不论是并发集合或传统集合都可使用PLINQ。

    3)默认情况下,执行PLINQ时,.NET尽量避免高开销并行化算法;若想强制并行执行,可使用ParallelExecutionMode.ForceParallelism

    4)根据可用内核数,PLINQ将接受的数据源分解为多份,然后在不同的内核上处理每一份。且对每一份的执行没有固定顺序。

    5)PLINQ查询有延缓执行的效果,因此要捕获查询所产生的结果在被消费者消费时产生的异常。

    6)Aggregate的重载方法之一可以将数据源序列分区成几个子序列(分区)。 对分区内的每个元素执行 updateAccumulatorFunc,得到每个分区的单个累积结果。 然后,在每个分区的结果上调用 combineAccumulatorsFunc 来产生一个元素。 最后,combineAccumulatorsFunc 产生的元素通过 resultSelector 函数进行转换即可获得最终结果。

    2 使用示例

    定义List<T> list = ......,代码中的condition为筛选条件。

    1)排序

     

     1 //确保运算获得的数据输出顺序与原集合一致
     2 var resLinq1 = from item in list.AsParallel().AsOrdered()
     3                           where (condition)
     4                           select item;
     5 var res1 = list.AsParallel().AsOrdered().Where(m=>m>4);
     6 //按升序排列使用ascending,降序排列使用descending
     7 var resLinq2 = from item in list.AsParallel()
     8                            where (condition)
     9                            orderby item descending
    10                            select item;
    11 
    12 //升序排列使用OrderBy,降序排列使用OrderByDescending
    13 var res2 = list.AsParallel().Where(item =>condition).OrderByDescending(m => m);

     

    2)设置查询的执行模式

     1 //强制并行化整个查询
     2 var resLinq3 = from item in list.AsParallel().
     3 WithExecutionMode(ParallelExecutionMode.ForceParallelism)
     4                            where (condition)
     5                            orderby item descending
     6                            select item;
     7 
     8 var res3 = list.AsParallel().
     9 WithExecutionMode(ParallelExecutionMode.ForceParallelism).
    10 Where(condition).OrderByDescending(m => m);

    3)规约操作

    假定这里的list中的元素为数字。

    1 var resLinq4 = (from item in list.AsParallel()
    2                             where (condition)
    3                             select item).Average();
    4 var res4 = list.AsParallel().Where(item => condition).Average();
    5 
    6 var resLinq5 = (from item in list.AsParallel()
    7                             where (condition)
    8                             select item).Max();
    9 var res5 = list.AsParallel().Where(item => condition).Max();

    4)自定义聚集函数

    假定这里的list中的元素为数字。

     1 //方差计算公式:S2 = ((X1-A)2+(X2-A)2+...+(Xn-A)2)/N,其中A为平均值,N为序列中元素个数,Xi为序列中第i个元素
     2 //sum 求和部分结果,item:集合list中的元素,result:经计算后得到的方差值。
     3 var average = list.AsParallel().Average();
     4 var res6 = list.AsParallel().Aggregate(
     5                 0d,
     6                 (sum, item) => sum + Math.Pow((item - average),2),
     7                 (result)=>result/list.Count
     8                 );
     9 //与上面结果相同,只不过多了combineAccumulatorsFunc函数
    10 var res7 = list.AsParallel().Aggregate(
    11                 0d,
    12                 (sum, item) => sum + Math.Pow((item - average), 2),
    13                 (total,thisTask)=>total+thisTask,
    14                 (result) => result / list.Count

    5)取消并行操作

     1 CancellationTokenSource cts = new CancellationTokenSource();
     2 CancellationToken ct = cts.Token;           
     3 CancelParallel(ct,list);
     4 
     5 private static void CancelParallel(CancellationToken ct,List<int> list)
     6 {
     7             //conditionExec此条件为真时才取消并行操作
     8             if (conditionExec)
     9             {
    10                 ct.ThrowIfCancellationRequested();
    11             }
    12             var average = list.AsParallel().Average();
    13             list.AsParallel().WithCancellation(ct).Aggregate(
    14                            0d,
    15                            (sum, item) => sum + Math.Pow((item - average), 2),
    16                            (total, thisTask) => total + thisTask,
    17                            (result) => result / list.Count
    18                            );
    19 }

    6)指定并行度

     

    1 int maxDegreeOfParallelism = Environment.ProcessorCount;
    2 var res8 = list.AsParallel().WithDegreeOfParallelism(maxDegreeOfParallelism).Aggregate(
    3                 0d,
    4                 (sum, item) => sum + Math.Pow((item - average), 2),
    5                 (result) => result / list.Count
    6                 );

     

    7)使用ForAll

    1 ConcurrentBag<T> bag = new ConcurrentBag<T>();
    2 list.AsParallel().ForAll(item => 
    3 {
    4         //对元素处理后加如集合
    5         bag.Add(itemAfter);
    6 });

    8)异常处理

     

    使用AggregateException处理异常,具体示例见Tasks.Parallel部分。

    线程池

    1 CLR 4线程池引擎与线程

    • CLR线程池引擎管理着一个池的线程,这些线程可以处理工作项。线程池引擎会每隔一段时间创建出额外的空闲线程,这些空闲线程以FIFO的顺序将工作项从队列中取出,并且开始执行这些工作项。
    • CLR线程池引擎创建一个托管线程需要数千CPU周期,并且消耗内存。
    • CLR线程池引擎维护了最低数量的闲置工作线程,通常等于逻辑内核数。
    • CLR线程池引擎管理的都是后台线程,即所有前台线程都退出了,后台线程不会维持应用程序继续运行。

    2 全局队列与局部队列

    • 使用使用TPL创建任务时,一个新的工作项会被加入到线程池全局队列中,当线程池中所有可用的工作线程都在执行工作项时,新加入线程池全局队列的工作相必须等待,直到有可用的工作项。
    • 线程池中每一个分配给了任务的线程都有自己的局部队列,这样可以减少对全局队列的争用。局部对列通常以LIFO的顺序抽取任务并执行。

    3 Threading.ThreadPool

    与使用任务将工作项加入队列相比,创建Task实例有一定的开销,但可以利用一些取消标记等。

    //使用QueueUserWorkItem方法将任务加入队列中。

    ThreadPool.QueueUserWorkItem((state) => { //具体业务 });

     

    //workerThreads线程池中辅助线程的最大数目,completionPortThreads线程池中异步 I/O 线程的最大数目

    ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);

     

    //workerThreads线程池根据需要创建的最少数量的辅助线程

    //completionPortThreads线程池根据需要创建的最少数量的异步 I/O 线程

    ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);

    -----------------------------------------------------------------------------------------

    转载与引用请注明出处。

    时间仓促,水平有限,如有不当之处,欢迎指正。

     

     

     

  • 相关阅读:
    layui 自定义统一监听事件(大范围)
    layui 自定义个别事件
    Django layui {{ }}冲突解决方法
    sudo apt install ...
    Field XXX in XXXX required a bean of type XXXX that could not be found
    Springboot2+bootstrap-table1.12.1+MybatisPlus3.0 后台物理分页实现
    springboot2在后台打印系统执行的SQL
    @Service注解让spring找到你的Service bean
    接受参数的包装类的数据类型写错报错
    Java 日期转字符串
  • 原文地址:https://www.cnblogs.com/hdwgxz/p/6261560.html
Copyright © 2011-2022 走看看