zoukankan      html  css  js  c++  java
  • Parallel线程使用

    Parallel的静态For,ForEach和Invoke方法

     
        在一些常见的编程情形中,使用任务也许会提升性能。为了简化编程,静态类System.Threading.Tasks.Paraller封装了这些常见的情形,它内部使用Task对象。例如,不要像下面一样处理一个集合中的所有项:
    // 一个线程顺序执行这个工作(每次迭代调用一次DoWork)
    for (Int32 i = 0; i< 1000; i++ ) DoWork(i);

        相反,可以使用Parallel类型的For方法,让多个线程池治线程帮助执行这个工作:

    // 线程池的线程并行处理工作
    Parallel.For(0,1000,i=>DoWork(i));

        类似的,如果有一个集合,那么不要像下面这样写:

    // 一个线程顺序执行这个工作(每次迭代调用一次DoWork)
    foreach ( var item in conllection) DoWork(item);

        而是这样做:

    // 线程池的线程并行处理工作
    Parallel.ForEach(conllection,item=>DoWork(item));

        如果代码中既可以用For,也可以用ForEach,那么建议使用For,因为它执行的快一点。最后,如果要执行几个方法,那么可以顺序执行它们,如下所示:

    //  一个线程顺序执行所有方法    
    Method1();
    Method2();
    Method3();

        也可以并行执行它们: 

    复制代码
    // 线程池的线程并行执行
    parallel.Invoke(
                () => Method1(),
                () => Method2(),
                () => Method3());
     
    复制代码

        Parallel的所有方法都让调用线程参与处理。从资源利用的角度说,这是一件好事,因为我们不希望调用线程停下来,等待线程池做完所有工作后才继续。然而,如果调用线程在线程池完成自己的那一部分工作之前完成工作,调用程序就会将自己挂起,知道所有工作完成。这也是一件好事,因为这个提供了和普通for和foreach循环时相同的语义:线程要在所有工作后才继续运行。还要注意,如果任何操作抛出一个未处理的异常,你调用的paraller方法最后会抛出一个AggregateException。

        当然,这并不是说需要检查自己的所有源代码,将for循环替换成Parallel.For的调用。调用Parallel的方法时,有一个前提条件务必记住:工作项要能并行执行。因此,如果工作项必须顺序执行,就不要调用Parallel的方法。另外,要避免会修改任何共享数据的工作项,因为多个线程同时处理的数据可能损坏。为了解决这个问题,一般的方法就是围绕数据访问添加线程同步锁。但是这样一来,一次就只能有一个线程访问数据,无法享受并行处理多个想带来的好处。
        除此之外,Parallel的方法本身也有开销:委托对象必须分配,而针对每一个工作项,都要调用一次这些委托。如果有大量可由多个线程处理的工作项,那么也许会获得性能的提升。但是,如果只为区区几个工作项使用Parallel的方法,或者为处理得非常快的工作项使用Parallel就会得不偿失了。
        Parallel的For,ForEach和Invoke方法都能接受一个ParallelOptions对象的重载版本。这个对象的定义如下:
       
    复制代码
     // 存储用于配置 Parallel 类的方法的操作的选项。
         public class ParallelOptions
        {
            // 初始化 ParallelOptions 类的新实例
            public ParallelOptions();
            // 获取或设置与此 ParallelOptions 实例关联的 CancellationToken,运行取消操作
            public CancellationToken CancellationToken { get; set; }
            // 获取或设置此 ParallelOptions 实例所允许的最大并行度,默认为-1(可用CPU数)
            public int MaxDegreeOfParallelism { get; set; }
            // 获取或设置与此 ParallelOptions 实例关联的 TaskScheduler。默认为TaskScheduler.Default
            public TaskScheduler TaskScheduler { get; set; }
        }
    复制代码

        除此之外,For和ForEach方法有一些重载版本允许传递3个委托:

        任务局部初始化委托(localInit),为参与工作的每一个任务都调用一次委托。这个委托是在任务被要求处理一个工作项之前调用。
        主体委托(body),为参与工作的各个线程所处理的每一项都调用一次委托。
        任务局部终结委托(localFinally),为参与工作的每一个任务都调用一次委托。这个委托是在任务处理好派遣给它的所有工作之后调用。即使主体委托引发一个未处理的异常,也会调用它。
        以下示例代码演示了如何利用3个委托,计算一个目录中的所有文件的字节长度总计值:
    复制代码
    private static Int64 DirectoryBytes(String path, String searchPattern, SearchOption searchOption)
            {
                var files = Directory.EnumerateFiles(path, searchPattern, searchOption);
                Int64 masterTotal = 0;
     
                ParallelLoopResult result = Parallel.ForEach<String, Int64>(files,
                   () =>
                   {
                       // localInit: 每个任务开始之前调用一次
                       // 每个任务开始之前,总计值都初始化为0
                       return 0;
                   },
     
                   (file, parallelLoopState, index, taskLocalTotal) =>
                   {
                       // body: 每个任务调用一次
                       // 获得这个文件的大小,把它添加到这个任务的累加值上
                       Int64 fileLength = 0;
                       FileStream fs = null;
                       try
                       {
                           fs = File.OpenRead(file);
                           fileLength = fs.Length;
                       }
                       catch (IOException) { /* 忽略拒绝访问的文件 */ }
                       finally { if (fs != null) fs.Dispose(); }
                       return taskLocalTotal + fileLength;
                   },
     
                   taskLocalTotal =>
                   {
                       // localFinally: 每个任务完成后调用一次
                       // 将这个任务的总计值(taskLocalTotal)加到中的总计值(masterTotal)上去
                       Interlocked.Add(ref masterTotal, taskLocalTotal);
                   });
                return masterTotal;
            }
    复制代码

        每个任务都通过taskLocalTotal变量为分配给它的文件维护自己的总计值。每个任务完成工作之后,都调用Interlocked.Add方法[对两个 32 位整数进行求和并用和替换第一个整数],以一种线程安全的方式更新总的总计值。由于每个任务都有自己的总计值,可以在一个工作项处理期间,无需进行线程同步。由于线程同步会造成性能的损失,所以不需要线程同步是一件好事。只有在每个任务返回之后,masterTotal才需要以一种线程安全的方式更新materTotal变量。所以,因为调用Interlocked.Add方法而造成的性能损失每个任务只发生一次,而不会每个工作项都发生。

        注意,我们向主题委托传递一个ParallelLoopState对象,它的定义如下:
        
    复制代码
    // 可用来使 Parallel 循环的迭代与其他迭代交互
        public class ParallelLoopState
        {
            // 获取循环的任何迭代是否已引发相应迭代未处理的异常
            public bool IsExceptional { get; }
            // 获取循环的任何迭代是否已调用 Stop
            public bool IsStopped { get; }
            // 获取从中调用 Break 的最低循环迭代。
            public long? LowestBreakIteration { get; }
            // 获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。
            public bool ShouldExitCurrentIteration { get; }
            // 告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代。
            public void Break();
            // 告知 Parallel 循环应在系统方便的时候尽早停止执行。
            public void Stop();
    }
    复制代码

          参与工作的每一个任务都会获得它自己的ParallelState对象,并可通过这个对象和参与工作的其他任务进行交互。Stop方法告诉循环停止处理任何更多的工作,未来对IsStopped属性的查询会返回true。Break方法告诉循环不再继续处理当前项之后的项。例如,假如ForEach被告知要处理100项,并在第5项时调用了Break,那么循环会确保前5项处理好之后,ForEach才返回。但注意,这并不是说在这100项中,只有前5项被处理,也许第5项之后可能在以前已经处理过了。LowestBreakIteration属性返回在处理过程中调用过Break方法的最低的项。从来没有调用过Break,LowestBreakIteration会返回null。

        处理任何一项时,如果造成一个未处理的异常,IsExceptional属性会返回true。如果处理一项时会花费大量的时间,代码可查询ShouldExitCurrentIteration属性看它是否应该提前退出。如果调用过Stop,调用过Break,取消过CancellationTokenSource,或者处理一项时造成了未处理的异常,这个属性就会返回true。
        Parallel的For和ForEach方法都返回一个ParallelLoopResult实例,他看起来像下面这样:
       
    复制代码
    // 提供执行 System.Threading.Tasks.Parallel 循环的完成状态。
        public struct ParallelLoopResult
        {
            // 获取该循环是否已运行完成(即该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求)。
            public bool IsCompleted { get; }
     
            // 获取从中调用 System.Threading.Tasks.ParallelLoopState.Break() 的最低迭代的索引。
            public long? LowestBreakIteration { get; }
        }
    复制代码

        可通过检查属性来了解循环的结果,如果IsCompleted返回true。表明循环运行完成,所有项都得到了处理。如果IsCompleted为false,而且LowestBreakIteration为null,表明参与工作的某个线程调用了Stop方法。如果LowestBreakIteration返回false,而且LowestBreakIteration不为null,表名参与工作的某个线程调用的Break方法,LowestBreakIteration返回的Int64值指明了保证已得到处理的最低一项的索引。

    场景

    public void run(Action<List<string>> onload)
            {
                List<string> directoryLists = new List<string>();
                directoryLists = Directory.GetDirectories(m_importPath).ToList();          
                Parallel.ForEach(directoryLists,new ParallelOptions { MaxDegreeOfParallelism=10}, i =>
                {
                    List<string> files = importer(i);

                    onload(files);

               }

         }

    Action<List<string>> onload = dir =>
                {
                    BeginInvoke(new EventHandler((obj, even) =>
                    {
                        var root = new TreeNode(dir[0]);
                        for (int i = 1; i < dir.Count; i++)
                        {
                            string fileName = dir[i].Substring(dir[i].LastIndexOf('\\') + 1);
                            root.Nodes.Add(fileName.Remove(fileName.IndexOf('.')));
                        }
                        this.treeView1.Nodes.Add(root);
                    }), null);

                };

    import.run(onload);

  • 相关阅读:
    Tensorflow基础教程4:卷积神经网络(CNN)介绍
    Keras之 cifar10数据集使用keras generator读取、模型训练、预测
    Tensorflow基础教程3:基础示例:多层感知机(MLP)
    Tensorflow基础教程2:Tensorflow模型建立与训练
    Chaquopy中不能导入64位Python训练的机器学习模型
    (转)使用SDWebImage和YYImage下载高分辨率图,导致内存暴增的解决办法
    C/C++ 递归与结束递归
    C/C++ 读取文件16进制格式
    C/C++ 打开外部程序
    C/C++ 遍历托盘图标
  • 原文地址:https://www.cnblogs.com/zhaowei303/p/3909522.html
Copyright © 2011-2022 走看看