1、简介
关于Parallel不想说太多,因为它是Task的语法糖,至少我是这么理解的,官方文档也是这么说的,它本身就是基本Task的.假设我们有一个集合,不管是什么集合,我们要遍历它,首先想到的是For(如何涉及到修改或者读可以用for)或者Foreach(如果单纯的读),但是它两是同步的去操作集合,但是使用Parallel的静态For或者Foreach那就可以让多个线程参与这个工作,这样就能充分的利用CPU,但是你需要考虑CPU上下文产生的性能消耗,以及Parallel本身的性能消耗,所以,这也能解释为什么,你的循环里面执行的是不耗时的操作,使用for或者foreach的速度比使用Parallel的要快,所以使用Parallel还是要慎重.而且使用Parallel还需要注意的一点就是,不能有多线程争用问题,就是你的循环体里面不能有操作静态资源的操作.如果真的需要,那你可以加锁,但是那就失去它的优势了.
2、使用注意点
(1)、不能操作共享资源,代码如下:
static int shareData = 10; static void Main(string[] args) { Parallel.For(1, 100000, i => Add(i)); Console.Write(shareData); Console.ReadKey(); } static void Add(int i) { shareData += i; }
代码逻辑很简单,1+2+3+......+100000这个过程中每次都加一个10,怎么说都是正数,但是,信不信它能给你加出个负数来(可能是内存溢出了),而且每次的结果都不一样(重要)!自己试试吧!
解决方案很简单,加个锁,代码如下:
class ParallerStudy { static int shareData = 10; static object lockObj = new object(); static void Main(string[] args) { Parallel.For(1, 100000, i => Add(i)); Console.Write(shareData); Console.ReadKey(); } static void Add(int i) { lock (lockObj) { shareData += i; } } }
这个肯定是正确的值,因为每次的输出都是这个,这里因为如果给循环的最终值设小的话,他好像是同步去做了,不会有问题,所以这里给了个100000,这个时候它会开多个线程去做.
(2)、它可以向Task一样抛出异常,都是AggregateException,代码如下:
这里就给截图了,不写代码了.
(3)、性能开销
这个不用多说,它是基于Task,开销还是有的,如果不清楚,去看我前面的文章
(4)、支持取消
取消貌似只能取消整个Parallel运算,不支持取消内部的方法,我试了不行,而且必须在执行Parallel之前取消它,之后都不行.很其怪,可能我的调用方式有问题,如果你们有好的方法,欢迎在下面评论.
(4)、可以设置最多的线程数
实战中有演示
(5)、调度器
这里就不介绍了,后续的随笔中会介绍
(6)、三个重要的委托
实战中有演示
3、实战
(1)、下面写个使用Parallel多线程去读文件的例子
代码如下:
class ParallerStudy { static void Main(string[] args) { var targetPath = $"{AppDomain.CurrentDomain.BaseDirectory}Test"; var totalLength = DictionaryFilesContent(targetPath, "", SearchOption.TopDirectoryOnly); Console.WriteLine("{0}目录下所有的文件长度总和为:{1}", targetPath, totalLength); Console.ReadKey(); } /// <summary> /// 多线程读取多个文件的内容 /// </summary> /// <param name="i"></param> /// <returns></returns> static long DictionaryFilesContent(string path, string searchPattern, SearchOption searchOptions) { var opts = new ParallelOptions(); //只允许开六个线程去做这个事情 opts.MaxDegreeOfParallelism =3; var files = Directory.EnumerateFiles(path); long totalFileLength = 0; Parallel.ForEach<string, long>( files, opts, //初始委托,该方法会在线程执行主要任务前执行,可用于参数校验等操作 () => { Console.WriteLine("开启读取文件,当前线程Id为{0}", Thread.CurrentThread.ManagedThreadId); return 0; }, //主体委托,开始干正事 (file, loopstate, index, taskLocalCount) => { long fileLength = 0; FileStream fs = null; try { fs = File.OpenRead(file); fileLength = fs.Length; Console.WriteLine("当前线程Id为{1},当前文件名为{2}", index, Thread.CurrentThread.ManagedThreadId, fs.Name); } catch (IOException) { }//排除拒绝访问的文件 finally { if (fs != null) fs.Dispose(); } return taskLocalCount + fileLength; }, //终结委托,一般用于汇总主体委托的结果值,所以如果这里涉及访问共享资源的话,一般会用同步构造,也就是加锁等操作 taskLocalCount => { //taskLocalCount=taskLocalCount + fileLength,单个文件的长度 //同步构造,不需要加锁,当每个线程读取完对应文件的长度后,将长度加到totalFileLength中,这个时候多个线程访问这个变量可能会出现 //多线程争用问题,但是使用Interlocked.Add相当与给totalFileLength加锁 Interlocked.Add(ref totalFileLength, taskLocalCount); }); return totalFileLength; } }
看这个例子前,还在想真的有这么厉害吗?其实也就那样,根据输出可以发现,一个开了3个线程,去读10个文件,我还在想这里面会不会有多线程争用问题,但是没有,你看它怎么做的,每个线程只会去读一个文件,读的快的,立即去读另外的文件,我执行了N次,发现并没有一个文件多个线程读的问题,所以每个线程只会去读一个文件,自然就不会有多线程争用问题了.
(2)、关于ParallelLoopState的用法
Stop()和Break方法最常用,当子任务处理批量的任务时,如果满足某种条件,则告诉其余的任务不需要在处理了.
这个对象主要用于Parallel开启的子任务群,它们内部之间的交流,代码如下:
static void Main(string[] args) { var targetPath = $"{AppDomain.CurrentDomain.BaseDirectory}Test"; var totalLength = DictionaryFilesContent(targetPath, "", SearchOption.TopDirectoryOnly); Console.WriteLine("{0}目录下所有的文件长度总和为:{1}", targetPath, totalLength); Console.ReadKey(); } /// <summary> /// 多线程读取多个文件的内容 /// </summary> /// <param name="i"></param> /// <returns></returns> static long DictionaryFilesContent(string path, string searchPattern, SearchOption searchOptions) { var opts = new ParallelOptions(); //只允许开六个线程去做这个事情 opts.MaxDegreeOfParallelism =3; var files = Directory.EnumerateFiles(path); long totalFileLength = 0; Parallel.ForEach<string, long>( files, opts, //初始委托,该方法会在线程执行主要任务前执行,可用于参数校验等操作 () => { Console.WriteLine("开启读取文件,当前线程Id为{0}", Thread.CurrentThread.ManagedThreadId); return 0; }, //主体委托,开始干正事 (file, loopstate, index, taskLocalCount) => { long fileLength = 0; FileStream fs = null; try { //处理完第3项,就不要在处理了,这个第三项的意思是不是第三个文件,也可能是第五个文件 if (index == 3) { loopstate.Stop(); } fs = File.OpenRead(file); fileLength = fs.Length; Console.WriteLine("当前线程Id为{1},当前文件名为{2}", index, Thread.CurrentThread.ManagedThreadId, fs.Name); } catch (IOException) { }//排除拒绝访问的文件 finally { if (fs != null) fs.Dispose(); } return taskLocalCount + fileLength; }, //终结委托,一般用于汇总主体委托的结果值,所以如果这里涉及访问共享资源的话,一般会用同步构造,也就是加锁等操作 taskLocalCount => { //taskLocalCount=taskLocalCount + fileLength,单个文件的长度 //同步构造,不需要加锁,当每个线程读取完对应文件的长度后,将长度加到totalFileLength中,这个时候多个线程访问这个变量可能会出现 //多线程争用问题,但是使用Interlocked.Add相当与给totalFileLength加锁 Interlocked.Add(ref totalFileLength, taskLocalCount); }); return totalFileLength; }
还有其它的一些用法,这里就不介绍了,Api里面都有介绍.
(3)、Parallel的返回值
就说一个LowestBreakIteration,如果这个返回值为null,说明子任务群有个调用了Stop方法,如果不为null,说明有个调用了Break方法且值为调用Break任务对应的Index值.