zoukankan      html  css  js  c++  java
  • C# Parallel用法

    1、Parallel.Invoke 主要用于任务的并行
      这个函数的功能和Task有些相似,就是并发执行一系列任务,然后等待所有完成。和Task比起来,省略了Task.WaitAll这一步,自然也缺少了Task的相关管理功能。它有两种形式:
      Parallel.Invoke( params Action[] actions);
      Parallel.Invoke(Action[] actions,TaskManager manager,TaskCreationOptions options);

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                var actions = new Action[]
                {
                    () => ActionTest("test 1"),
                    () => ActionTest("test 2"),
                    () => ActionTest("test 3"),
                    () => ActionTest("test 4")
                };
    
                Console.WriteLine("Parallel.Invoke 1 Test");
                Parallel.Invoke(actions);
    
                Console.WriteLine("结束!");
            }
    
            static void ActionTest(object value)
            {
                Console.WriteLine(">>> thread:{0}, value:{1}",
                Thread.CurrentThread.ManagedThreadId, value);
            }
        }
    }
    Program

    2、For方法,主要用于处理针对数组元素的并行操作(数据的并行) 

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                int[] nums = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
                Parallel.For(0, nums.Length, (i) =>
                {
                    Console.WriteLine("针对数组索引{0}对应的那个元素{1}的一些工作代码……ThreadId={2}", i, nums[i], Thread.CurrentThread.ManagedThreadId);
                });
                Console.ReadKey();
            }
        }
    }
    Program

    3、Foreach方法,主要用于处理泛型集合元素的并行操作(数据的并行)

    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
                Parallel.ForEach(nums, (item) =>
                {
                    Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", item, Thread.CurrentThread.ManagedThreadId);
                });
                Console.ReadKey();
            }
        }
    }
    Program

      数据的并行的方式二(AsParallel()):

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
                var evenNumbers = nums.AsParallel().Select(item => Calculate(item));
                //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
                Console.WriteLine(evenNumbers.Count());
                //foreach (int item in evenNumbers)
                //    Console.WriteLine(item);
                Console.ReadKey();
            }
    
            static int Calculate(int number)
            {
                Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
                return number * 2;
            }
        }
    }
    Program

      .AsOrdered() 对结果进行排序:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp
    {
    
        class Program
        {
            static void Main(string[] args)
            {
                List<int> nums = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
                var evenNumbers = nums.AsParallel().AsOrdered().Select(item => Calculate(item));
                //注意这里是个延迟加载,也就是不用集合的时候 这个Calculate里面的算法 是不会去运行 可以屏蔽下面的代码看效果;
                //Console.WriteLine(evenNumbers.Count());
                foreach (int item in evenNumbers)
                    Console.WriteLine(item);
                Console.ReadKey();
            }
    
            static int Calculate(int number)
            {
                Console.WriteLine("针对集合元素{0}的一些工作代码……ThreadId={1}", number, Thread.CurrentThread.ManagedThreadId);
                return number * 2;
            }
        }
    }
    Program

      ForEach的独到之处就是可以将数据进行分区,每一个小区内实现串行计算,分区采用Partitioner.Create实现

    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                for (int j = 1; j < 4; j++)
                {
                    ConcurrentBag<int>  bag = new ConcurrentBag<int>();
                    var watch = Stopwatch.StartNew();
                    watch.Start();
                    Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
                    {
                        for (int m = i.Item1; m < i.Item2; m++)
                        {
                            bag.Add(m);
                        }
                    });
                    Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
                    GC.Collect();
    
                }
            }
        }
    }
    Program

      ParallelOptions类
      ParallelOptions options = new ParallelOptions();
      //指定使用的硬件线程数为4
      options.MaxDegreeOfParallelism = 4;
      有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了MaxDegreeOfParallelism属性。

    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        public class Student
        {
            public int ID { get; set; }
            public string Name { get; set; }
            public int Age { get; set; }
            public DateTime CreateTime { get; set; }
        }
    
        class Program
        {
            static void Main(string[] args)
            {
                var dic = LoadData();
                Stopwatch watch = new Stopwatch();
                watch.Start();
                var query2 = (from n in dic.Values.AsParallel()
                              where n.Age > 20 && n.Age < 25
                              select n).ToList();
                watch.Stop();
                Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);
    
                Console.Read();
            }
    
            public static ConcurrentDictionary<int, Student> LoadData()
            {
                ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
                ParallelOptions options = new ParallelOptions();
                //指定使用的硬件线程数为4
                options.MaxDegreeOfParallelism = 4;
                //预加载1500w条记录
                Parallel.For(0, 15000000, options, (i) =>
                {
                    var single = new Student()
                    {
                        ID = i,
                        Name = "hxc" + i,
                        Age = i % 151,
                        CreateTime = DateTime.Now.AddSeconds(i)
                    };
                    dic.TryAdd(i, single);
                });
    
                return dic;
            }
        }
    }
    Program

    常见问题的处理

      <1> 如何中途退出并行循环?
      是的,在串行代码中我们break一下就搞定了,但是并行就不是这么简单了,不过没关系,在并行循环的委托参数中提供了一个ParallelLoopState,该实例提供了Break和Stop方法来帮我们实现。
      Break: 当然这个是通知并行计算尽快的退出循环,比如并行计算正在迭代100,那么break后程序还会迭代所有小于100的。
      Stop:这个就不一样了,比如正在迭代100突然遇到stop,那它啥也不管了,直接退出。

    using System;
    using System.Collections.Concurrent;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                ConcurrentBag<int> bag = new ConcurrentBag<int>();
    
                Parallel.For(0, 20000000, (i, state) =>
                {
                    if (bag.Count == 1000)
                    {
                        //state.Break();
                        state.Stop();
                        return;
                    }
                    bag.Add(i);
                });
    
                Console.WriteLine("当前集合有{0}个元素。", bag.Count);
    
            }
        }
    }
    Program

      取消(cancel)

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            public static void Main()
            {
    
                var cts = new CancellationTokenSource();
                var ct = cts.Token;
                Task.Factory.StartNew(() => fun(ct));
                Console.ReadKey();
                //Thread.Sleep(3000);
                cts.Cancel();
                Console.WriteLine("任务取消了!");
    
            }
    
            static void fun(CancellationToken token)
            {
                Parallel.For(0, 100000,
                            new ParallelOptions { CancellationToken = token },
                            (i) =>
                            {
                                Console.WriteLine("针对数组索引{0}的一些工作代码……ThreadId={1}", i, Thread.CurrentThread.ManagedThreadId);
                            });
            }
        }
    }
    Program

      <2> 并行计算中抛出异常怎么处理?
      首先任务是并行计算的,处理过程中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就可以获取到一组异常。

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        class Program
        {
            static void Main(string[] args)
            {
                try
                {
                    Parallel.Invoke(Run1, Run2);
                }
                catch (AggregateException ex)
                {
                    foreach (var single in ex.InnerExceptions)
                    {
                        Console.WriteLine(single.Message);
                    }
                }
                Console.WriteLine("结束了!");
                //Console.Read();
            }
    
            static void Run1()
            {
                Thread.Sleep(3000);
                throw new Exception("我是任务1抛出的异常");
            }
    
            static void Run2()
            {
                Thread.Sleep(5000);
                throw new Exception("我是任务2抛出的异常");
            }
        }
    }
    Program

      注意Parallel里面 不建议抛出异常 因为在极端的情况下比如进去的第一批线程先都抛异常了 此时AggregateExcepation就只能捕获到这一批的错误,然后程序就结束了

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        public class TestClass
        {
            public static List<int> NumberList = null;
            private static readonly object locker = new object();
            public void Test(int Number)
            {
                throw new Exception("1111");
                //lock (locker)
                //{
                //    if (NumberList == null)
                //    {
                //        Console.WriteLine("执行添加");
                //        NumberList = new List<int>();
                //        NumberList.Add(1);
                //        //Thread.Sleep(1000);
                //    }
                //}
                //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
                //Console.WriteLine(Number);
            }
        }
    
        class Program
        {
            private static readonly object locker = new object();
            static void Main(string[] args)
            {
                List<string> errList = new List<string>();
                try
                {
                    Parallel.For(0, 10, (i) =>
                    {
                        try
                        {
                            TestClass a = new TestClass();
                            a.Test(i);
                        }
                        catch (Exception ex)
                        {
                            lock (locker)
                            {
                                errList.Add(ex.Message);
                                throw ex;
                            }
                        }
                    });
                }
                catch (AggregateException ex)
                {
                    foreach (var single in ex.InnerExceptions)
                    {
                        Console.WriteLine(single.Message);
                    }
                }
                int Index = 1;
                foreach (string err in errList)
                {
                    Console.WriteLine("{0}、的错误:{1}", Index++, err);
                }
            }
        }
    }
    Program

      可以向下面这样来处理一下
      不在AggregateExcepation中来处理 而是在Parallel里面的try catch来记录错误,或处理错误

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    namespace ConsoleApp1
    {
        public class TestClass
        {
            public static List<int> NumberList = null;
            private static readonly object locker = new object();
            public void Test(int Number)
            {
                throw new Exception("1111");
                //lock (locker)
                //{
                //    if (NumberList == null)
                //    {
                //        Console.WriteLine("执行添加");
                //        NumberList = new List<int>();
                //        NumberList.Add(1);
                //        //Thread.Sleep(1000);
                //    }
                //}
                //if (Number == 5 || Number == 7) throw new Exception(string.Format("NUmber{0}Boom!", Number));
                //Console.WriteLine(Number);
            }
        }
    
        class Program
        {
            private static readonly object locker = new object();
            static void Main(string[] args)
            {
                List<string> errList = new List<string>();
                Parallel.For(0, 10, (i) =>
                {
                    try
                    {
                        TestClass a = new TestClass();
                        a.Test(i);
                    }
                    catch (Exception ex)
                    {
                        lock (locker)
                        {
                            errList.Add(ex.Message);
                        }
                        //Console.WriteLine(ex.Message);
                        //注:这里不再将错误抛出.....
                        //throw ex;
                    }
                });
    
                int Index = 1;
                foreach (string err in errList)
                {
                    Console.WriteLine("{0}、的错误:{1}", Index++, err);
                }
            }
        }
    }
    Program
  • 相关阅读:
    怎样提高js的编程能力
    如何提升自己
    利用nginx做反向代理解决前端跨域问题
    vue项目中使用组件化开发
    vue中refs的使用
    vue项目使用keep-alive的作用
    hadoop安装、使用过程的异常汇总
    CSS选择器优先级总结
    为什么CSS选择器是从右往左解析
    MySql中游标的定义与使用方式
  • 原文地址:https://www.cnblogs.com/scmail81/p/9521096.html
Copyright © 2011-2022 走看看