并行开发的概念
并行开发要做的事情就是将任务分摊给硬件线程去并行执行来达到负载和加速,传统的代码都是串行的,就一个主线程,当我们为了实现加速而开了很多工作线程,这些工作线程就是软件线程
Parallel的使用
Parallel类是对线程的抽象,位于System.Threading.Tasks名称空间下,提供了任务和数据并行性.在Parallel下有三个常用的方法Invoke,For和ForEach,其中Parallel.Invoke用于任务并行性,Parallel.ForEach/Parallel.For用于数据并行性
Parallel.Invoke
如果多个任务应并行运行,就可以使用Parallel.Invoke()方法最简单,最简洁的将串行的代码并行化
简单应用
class ThreadTest { static void Main(string[] args) { var watch = Stopwatch.StartNew(); watch.Start(); Run1(); Run2(); Run3(); watch.Stop(); Console.WriteLine("串行开发,总耗时{0}", watch.ElapsedMilliseconds); watch.Restart(); Parallel.Invoke(Run1, Run2, Run3); watch.Stop(); Console.WriteLine("并行开发,总耗时{0}", watch.ElapsedMilliseconds); Console.ReadKey(); } static void Run1() { Console.WriteLine("Run1,我需要1s"); Thread.Sleep(1000); } static void Run2() { Console.WriteLine("Run2,我需要3s"); Thread.Sleep(3000); } static void Run3() { Console.WriteLine("Run3,我需要4s"); Thread.Sleep(4000); } }
主程序启动时,先顺序调用Run1(),Run()2,Run3()方法,这是串行的,而后使用Parallel.Invoke()将三个方法并行调用,可见耗时是有明显下降的
执行顺序
static void Main(string[] args) { Console.WriteLine("主线程启动,线程ID:{0}", Thread.CurrentThread.ManagedThreadId); Parallel.Invoke(() => Run1("task1"), () => Run2("task2"), () => Run3("task3")); Console.WriteLine("主线程结束,线程ID:{0}", Thread.CurrentThread.ManagedThreadId); Console.ReadKey(); } static void Run1(string taskName) { Console.WriteLine("任务名:{0}线程ID:{1}", taskName, Thread.CurrentThread.ManagedThreadId); for (int i = 0; i < 5; i++) { Console.WriteLine("a"); } } static void Run2(string taskName) { Console.WriteLine("任务名:{0}线程ID:{1}", taskName, Thread.CurrentThread.ManagedThreadId); for (int i = 0; i < 5; i++) { Console.WriteLine("b"); } } static void Run3(string taskName) { Console.WriteLine("任务名:{0}线程ID:{1}", taskName, Thread.CurrentThread.ManagedThreadId); for (int i = 0; i < 5; i++) { Console.WriteLine("c"); } }
结果可知:
1、没有固定顺序,每个Task可能是不同的线程去执行,也可能是相同的
2、主线程必须等Invoke中的所有方法执行完成后返回才继续向下执行,以后设计并行的时候,要考虑每个Task任务尽可能差不多,如果相差很大,比如一个时间非常长,其他都比较短,这样一个线程可能会影响整个任务的性能。这点非常重要(就是说Invoke会阻塞主线程)
Parallel.For
Parallel.For是 for 的多线程实现,串行代码中也有一个for,但是那个for并没有用到多核,而Paraller.For它会在底层根据硬件线程的运行状况来充分的使用所有的可利用的硬件线程
static void Main(string[] args) { for (int i = 0; i < 3; i++) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); for (int j = 0; j < 20000000; j++) { bag.Add(i); } watch.Stop(); Console.WriteLine("串行添加,总数20000000,耗时{0}", watch.ElapsedMilliseconds); GC.Collect(); watch.Restart(); Parallel.For(0, 20000000, j => { bag.Add(j); }); watch.Stop(); Console.WriteLine("并行添加,总数20000000,耗时{0}", watch.ElapsedMilliseconds); Console.WriteLine("***********************************"); GC.Collect(); } Console.ReadKey(); }
向一个线程安全的集合插入数据,使用串行的for耗时与使用并行的Parallel.For差异
Parallel.ForEach
Parallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的特殊之处在于它使用多线程来执行循环体内的代码段
static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); Parallel.For(0, 10, j => { bag.Add(j); }); Console.WriteLine("集合总数:{0}", bag.Count); Parallel.ForEach(bag, item => { Console.WriteLine(item); }); Console.ReadKey(); }
Parallel.ForEach的分区
TODO
中断
Parallel.For:添加ParallelLoopState参数,该实例提供了Break和Stop方法来帮助实现
static void Main(string[] args) { ConcurrentBag<int> bag = new ConcurrentBag<int>(); var watch = Stopwatch.StartNew(); watch.Start(); Parallel.For(0, 2000, (j, state) => { if (bag.Count == 1000) { state.Break(); //return是必须的,否则依旧会继续执行 return; } bag.Add(j); }); watch.Stop(); Console.WriteLine("集合元素个数{0}", bag.Count); Console.ReadKey(); }
ParallelLoopState.Break():在完成当前的这轮工作之后,不再执行后继的工作,但在当前这轮工作开始之前“已经在执行”的工作,则必须完成。但并不能执行完所有的循环。
ParallelLoopState.Stop:不但不会再创建新的线程执行并行循环,而且当前“已经在执行”的工作也应该被中止。
注意:Stop仅仅通知其他迭代尽快结束,而Break不仅通知其他迭代尽快结束,同时还要保证退出之前要完成LowestBreakIteration之前的迭代。 例如,对于从 0 到 1000 并行迭代的 for 循环,如果从第 100 此迭代开始调用 Break,则低于 100 的所有迭代仍会运行,从 101 到 1000 的迭代则不必要。而调用Stop方法不保证低于 100 的所有迭代都会运行。
这里发现一个问题
ParallelLoopState.Break()
ConcurrentBag<int> bag = new ConcurrentBag<int>(); for (int j = 0; j < 5; j++) { bag = new ConcurrentBag<int>(); Parallel.For(0, 2000, (i, state) => { if (bag.Count == 1000) { state.Break(); return;//return是必须的,否则依旧会继续执行 } else { bag.Add(i); } }); Console.WriteLine("一共添加2000个元素,集合元素实际个数为:{0}", bag.Count); Console.WriteLine("*************************************************"); }
ParallelLoopState.Stop()
ConcurrentBag<int> bag = new ConcurrentBag<int>(); for (int j = 0; j < 5; j++) { bag = new ConcurrentBag<int>(); Parallel.For(0, 2000, (i, state) => { if (bag.Count == 1000) { state.Stop(); return;//return是必须的,否则依旧会继续执行 } else { bag.Add(i); } }); Console.WriteLine("一共添加2000个元素,集合元素实际个数为:{0}", bag.Count); Console.WriteLine("*************************************************"); }
TODO:两种方法都无法准确的在元素添加到1000时结束循环,这里需要后续好好查资料看看.
异常处理
任务是并行计算的,处理过程中可能会产生n多的异常
Exception
Exception是可以捕获到两个异常的,断点可见,但是遍历找不到InnerExceptions属性?
static void Main(string[] args) { Console.WriteLine("主线程启动,线程ID:{0}", Thread.CurrentThread.ManagedThreadId); try { Parallel.Invoke(() => Run1("task1"), () => Run2("task2"), () => Run3("task3")); } catch (Exception ex) { Console.WriteLine(ex.Message); } Console.WriteLine("主线程结束,线程ID:{0}", Thread.CurrentThread.ManagedThreadId); Console.ReadKey(); } static void Run1(string taskName) { Console.WriteLine("任务名:{0}线程ID:{1}", taskName, Thread.CurrentThread.ManagedThreadId); throw new Exception("Run1出现异常"); } static void Run2(string taskName) { Console.WriteLine("任务名:{0}线程ID:{1}", taskName, Thread.CurrentThread.ManagedThreadId); for (int i = 0; i < 5; i++) { Console.WriteLine("b"); } } static void Run3(string taskName) { Console.WriteLine("任务名:{0}线程ID:{1}", taskName, Thread.CurrentThread.ManagedThreadId); throw new Exception("Run3出现异常"); }
AggregateException
static void Main(string[] args) { Console.WriteLine("主线程启动,线程ID:{0}", Thread.CurrentThread.ManagedThreadId); try { Parallel.Invoke(() => Run1("task1"), () => Run2("task2"), () => Run3("task3")); } catch (AggregateException ex) { //AggregateException捕获并行产生的一组异常集合 foreach (var item in ex.InnerExceptions) { Console.WriteLine(item); }; } Console.WriteLine("主线程结束,线程ID:{0}", Thread.CurrentThread.ManagedThreadId); Console.ReadKey(); }
Invoke方法中调用了一个产生异常的方法,但是结果显示异常并不会影响其它方法及主线程的执行
ParallelOptions类
CancellationToken |
获取或设置与此 ParallelOptions 实例关联的 CancellationToken。 |
MaxDegreeOfParallelism |
获取或设置此 ParallelOptions 实例所允许的并发任务的最大数目。 |