zoukankan      html  css  js  c++  java
  • Parallel的使用

    C#并行编程之Parallel的使用

     

    前言:在C#的System.Threading.Tasks 命名空间中有一个静态的并行类:Parallel,封装了Task的使用,对于执行大量任务提供了非常简便的操作。下面对他的使用进行介绍。

    本篇内容:

    1.1、Parallel.For 使用
    1.2、Parallel.ForEach 使用
    1.3、Parallel.Invoke 使用
    1.4、ParallelOptions 选项配置
    1.5、ParallelLoopResult 执行结果
    1.6、ParallelLoopState 提前结束
    1.7、Parallel的使用场景分析

    1.1、Parallel.For 使用

    首先创建一个控制台程序,本案例使用的是.net core 3.1,引入命名空间 using System.Threading。假设某个操作需要执行10次,从0到9代码如下:

    static void ParallelFor(int num)
            {
                Console.WriteLine($"ParallelFor执行 {num} 次");
                var list = new List(num);
                ParallelLoopResult result = Parallel.For(0, num, i =>
                  {
                      list.Add(new Product { Id = i, Name = "TestName" });
                      Console.WriteLine($"Task Id:{Task.CurrentId},Thread: {Thread.CurrentThread.ManagedThreadId}");
                      Thread.Sleep(10);
                  });
            }

     执行结果如下:

     从打印信息可以看出,任务Id和线程都是无序的,在使用时需要注意。

    Parallel.For 还提供了很多重载本版:

     我们看一下带ParallelLoopState 参数的一个重载版本:ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)

    测试代码:

    /// <summary>
            /// 提前终止
            /// </summary>
            static void ParallelForAsyncAbort()
            {
                ParallelLoopResult result =
                    Parallel.For(10, 100, async (int index, ParallelLoopState pls) =>
                    {
                        Console.WriteLine($"index:{index} task:{Task.CurrentId},Thread:{Thread.CurrentThread.ManagedThreadId}");
                        await Task.Delay(10);
                        if (index > 30)
                            pls.Break();
                    });
                Console.WriteLine($"Is completed:{result.IsCompleted} LowestBreakIteration:{result.LowestBreakIteration}");
            }

    复制代码
            /// <summary>
            /// 提前终止
            /// </summary>
            static void ParallelForAsyncAbort()
            {
                ParallelLoopResult result =
                    Parallel.For(10, 100, async (int index, ParallelLoopState pls) =>
                    {
                        Console.WriteLine($"index:{index} task:{Task.CurrentId},Thread:{Thread.CurrentThread.ManagedThreadId}");
                        await Task.Delay(10);
                        if (index > 30)
                            pls.Break();
                    });
                Console.WriteLine($"Is completed:{result.IsCompleted} LowestBreakIteration:{result.LowestBreakIteration}");
            }
    复制代码

    执行结果:

     任务提前结束了,最小执行Break方法的索引为19

    带参数:ParallelOptions的重载版本:

    /// <summary>
    /// 执行500毫秒后取消
    /// </summary>
    static void ParalletForCancel()
    {
    var cts = new CancellationTokenSource();
    cts.Token.Register(() => { Console.WriteLine($"*** token canceled"); }
    );
    // send a cancel after 500ms
    cts.CancelAfter(500);
    try
    {
    ParallelLoopResult result =
    Parallel.For(0, 100, new ParallelOptions()
    {
    CancellationToken = cts.Token,
    }, x =>
    {
    Console.WriteLine($"loop {x} started");
    int sum = 0;
    for (int i = 0; i < 100; i++)
    {
    Thread.Sleep(2);
    sum += i;
    }
    Console.WriteLine($"loop {x} finished");
    });

    }
    catch (OperationCanceledException ex)
    {
    Console.WriteLine(ex.Message);
    }
    }

    复制代码
    /// <summary>
            /// 执行500毫秒后取消
            /// </summary>
            static void ParalletForCancel()
            {
                var cts = new CancellationTokenSource();
                cts.Token.Register(() => { Console.WriteLine($"*** token canceled"); }
                );
                // send a cancel after 500ms
                cts.CancelAfter(500);
                try
                {
                    ParallelLoopResult result =
                         Parallel.For(0, 100, new ParallelOptions()
                         {
                             CancellationToken = cts.Token,
                         }, x =>
                         {
                             Console.WriteLine($"loop {x} started");
                             int sum = 0;
                             for (int i = 0; i < 100; i++)
                             {
                                 Thread.Sleep(2);
                                 sum += i;
                             }
                             Console.WriteLine($"loop {x} finished");
                         });
    
                }
                catch (OperationCanceledException ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    复制代码

    任务执行一段时间后取消了

     1.2、Parallel.ForEach 使用

    ForEach方法可用于对集合,数组,或枚举进行循环操作,下面进行简单使用:

    //简单使用
    static void ParallelForEach()
    {
    string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten" };
    ParallelLoopResult result =
    Parallel.ForEach(data, a =>
    {
    Console.WriteLine(a);
    });
    }


    复制代码
            //简单使用
            static void ParallelForEach()
            {
                string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten" };
                ParallelLoopResult result =
                        Parallel.ForEach(data, a =>
                        {
                            Console.WriteLine(a);
                        });
            }
    复制代码

    执行结果:

     请注意循环的执行是无序的,我们打印出执行顺序:

    //带索引的循环操作
    static void ParallelForEach2()
    {
    string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten" };
    ParallelLoopResult result =
    Parallel.ForEach<string>(data, (str, psl, index) =>
    {
    Console.WriteLine($"str:{str} index:{index}");
    });
    }

    复制代码
       //带索引的循环操作
            static void ParallelForEach2()
            {
                string[] data = { "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten" };
                ParallelLoopResult result =
                        Parallel.ForEach<string>(data, (str, psl, index) =>
                        {
                            Console.WriteLine($"str:{str} index:{index}");
                        });
            }
    复制代码

    执行结果:

     ForEach同样支持提前结束和取消操作:

     1.3、Parallel.Invoke 使用

    Invoke主要用于操作(任务)并行,能同时执行多个操作,并尽可能的同时执行。

    简单使用:

    static void ParallelInvoke()
    {
    Parallel.Invoke(Foo, Bar);
    }

    static void Foo()
    {
    Console.WriteLine("Foo");
    }

    static void Bar()
    {
    Console.WriteLine("Bar");
    }

    复制代码
            static void ParallelInvoke()
            {
                Parallel.Invoke(Foo, Bar);
            }
    
            static void Foo()
            {
                Console.WriteLine("Foo");
            }
    
            static void Bar()
            {
                Console.WriteLine("Bar");
            }
    复制代码

    大于10个操作:

    static void ParallelInvoke2()
    {
    Action action = () =>
    {
    Console.WriteLine($"Thread Id:{Thread.CurrentThread.ManagedThreadId}");
    };
    Parallel.Invoke(action, action, action, action, action, action, action, action, action, action, action);
    Console.WriteLine("Parallel.Invoke 执行完毕");
    }

    复制代码
            static void ParallelInvoke2()
            {
                Action action = () =>
                {
                    Console.WriteLine($"Thread Id:{Thread.CurrentThread.ManagedThreadId}");
                };
                Parallel.Invoke(action, action, action, action, action, action, action, action, action, action, action);
                Console.WriteLine("Parallel.Invoke 执行完毕");
              }
    复制代码

    如果任务不超过10个,Invoke内部会使用Task.Factory.StartNew 创建任务,效率不高,不如直接使用Task。

     1.4、ParallelOptions 选项配置

     ParallelOptions是一个选项配置,有三个属性:

     1.4.1、CancellationToken-定义取消令牌,处理任务被取消后的一些操作

    1.4.2、MaxDegreeOfParallelism-设置最大并发限制,默认-1

    1.4.3、TaskScheduler 指定任务调度器

     1.5、ParallelLoopResult 执行结果

    ParallelLoopResult,并发循环结果,有两个属性:

    IsCompleted-任务是否执行完

    LowestBreakIteration-调用Break方法的最小任务的索引

    1.6、ParallelLoopState 提前结束

    ParallelLoopState 用于提前结束循环操作,比如搜索算法,已找到结果提前结束查询。

    有两个方法:

    Break: 告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代

    Stop:告知 Parallel 循环应在系统方便的时候尽早停止执行。

    如果循环之外还有需要执行的代码则用Break,否则使用Stop

     1.7、Parallel的使用场景分析

     1.7.1、Parallel.Invoke 使用特点:

         1、如果操作小于10个,使用Task.Factory.StartNew 或者Task.Run 效率更高

         2、适合用于执行大量操作且无需返回结果的场景

    1.7.2、Parallel.For 使用特点:

         1、带索引的大量循环操作

    1.7.3、Parallel.ForEach 使用特点:

        1、大数据集(数组,集合,枚举集)的循环执行

    1.7.4、注意事项:

        1、循环操作是无序的,如果需要顺序直接请使用同步执行

        2、如果涉及操作共享变量请使用线程同步锁

        3、如果是简单、量大且无等待的操作可能并不适用,同步执行可能更快

        4、注意错误的处理,如果是带数据库的操作请注意事务的使用

        5、个人测试,Parallel.ForEach 的使用效率比Parallel.For更高

    性能测试代码如下:

    #region 性能测试
    private static void TestPerformance()
    {
    int num = 10;
    Console.WriteLine($"测试执行:{num}次");
    Console.WriteLine();
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();
    ParallelFor(num);
    stopwatch.Stop();
    Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
    Console.WriteLine();
    stopwatch.Restart();
    ParallelForEach(num);
    stopwatch.Stop();
    Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
    Console.WriteLine();
    stopwatch.Restart();
    Sync(num);
    stopwatch.Stop();
    Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
    Console.WriteLine();
    stopwatch.Restart();
    TaskTest(num);
    stopwatch.Stop();
    Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
    }

    static void ParallelFor(int num)
    {
    Console.WriteLine($"ParallelFor执行 {num} 次");
    var list = new List<Product>(num);
    ParallelLoopResult result = Parallel.For(0, num, i =>
    {
    list.Add(new Product { Id = i, Name = "TestName" });
    //去掉Thread的代码模拟简单业务操作
    Thread.Sleep(10);
    });
    }

    static void Sync(int num)
    {
    Console.WriteLine($"同步执行 {num} 次");
    var list = new List<Product>(num);
    for (int i = 0; i < num; i++)
    {
    list.Add(new Product { Id = i, Name = "TestName" });
    Thread.Sleep(10);
    }
    }

    static void ParallelForEach(int num)
    {
    string[] datas = new string[num];
    Console.WriteLine($"ParallelForEach执行 {num} 次");
    var list = new List<Product>(num);
    Parallel.ForEach(datas, (s, pls, i) =>
    {
    list.Add(new Product { Id = (int)i, Name = "TestName" });
    Thread.Sleep(10);
    });
    }
    static void TaskTest(int num)
    {
    Console.WriteLine($"Task 执行 {num} 次");
    var list = new List<Product>(num);
    while (num > 0)
    {
    Task.Run(() =>
    {
    list.Add(new Product { Id = num, Name = "TestName" });
    });
    Thread.Sleep(10);
    num--;
    }
    }

    #endregion

    复制代码
            #region 性能测试
            private static void TestPerformance()
            {
                int num = 10;
                Console.WriteLine($"测试执行:{num}次");
                Console.WriteLine();
                Stopwatch stopwatch = new Stopwatch();
                stopwatch.Start();
                ParallelFor(num);
                stopwatch.Stop();
                Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
                Console.WriteLine();
                stopwatch.Restart();
                ParallelForEach(num);
                stopwatch.Stop();
                Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
                Console.WriteLine();
                stopwatch.Restart();
                Sync(num);
                stopwatch.Stop();
                Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
                Console.WriteLine();
                stopwatch.Restart();
                TaskTest(num);
                stopwatch.Stop();
                Console.WriteLine($"耗时:{stopwatch.ElapsedMilliseconds}");
            }
    
            static void ParallelFor(int num)
            {
                Console.WriteLine($"ParallelFor执行 {num} 次");
                var list = new List<Product>(num);
                ParallelLoopResult result = Parallel.For(0, num, i =>
                {
                    list.Add(new Product { Id = i, Name = "TestName" });
                    //去掉Thread的代码模拟简单业务操作
                    Thread.Sleep(10);
                });
            }
    
            static void Sync(int num)
            {
                Console.WriteLine($"同步执行 {num} 次");
                var list = new List<Product>(num);
                for (int i = 0; i < num; i++)
                {
                    list.Add(new Product { Id = i, Name = "TestName" });
                    Thread.Sleep(10);
                }
            }
    
            static void ParallelForEach(int num)
            {
                string[] datas = new string[num];
                Console.WriteLine($"ParallelForEach执行 {num} 次");
                var list = new List<Product>(num);
                Parallel.ForEach(datas, (s, pls, i) =>
                {
                    list.Add(new Product { Id = (int)i, Name = "TestName" });
                    Thread.Sleep(10);
                });
            }
            static void TaskTest(int num)
            {
                Console.WriteLine($"Task 执行 {num} 次");
                var list = new List<Product>(num);
                while (num > 0)
                {
                    Task.Run(() =>
                    {
                        list.Add(new Product { Id = num, Name = "TestName" });
                    });
                    Thread.Sleep(10);
                    num--;
                }
            }
    
            #endregion
    复制代码

    简单任务性能测试:

     

     

     

     

     

     

     复杂任务性能测试模拟:

     

     

     

     以上是我对Parallel的学习和使用经验总结,欢迎大家一起交流和学习。

    参考:《C#高级编程第4版》、微软官网

    作者:Leo_wl
             
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    版权信息
  • 相关阅读:
    POJ 2388(排序)
    优先队列(堆实现)
    POJ 3322(广搜)
    POJ 1190(深搜)
    POJ 1456(贪心)
    poj 2524 (并查集)
    poj 1611(并查集)
    poj 1521
    poj 1220(短除法)
    css 如何实现图片等比例缩放
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/15761331.html
Copyright © 2011-2022 走看看