zoukankan      html  css  js  c++  java
  • 任务并行

    任务并行

          前面一篇提到例子都是数据并行,但这并不是并行化的唯一形式,在.Net4之前,必须要创建多个线程或者线程池来利用多核技术。现在只需要使用新的Task实例就可以通过更简单的代码解决命令式任务并行问题。

    1.Task及它的生命周期

          一个Task表示一个异步操作,它的创建和执行都是独立的,因此可以对相关操作的执行拥有完全的控制权;当有很多异步操作作为Task实例加载的时候,为了充分利用运行时的逻辑内核,任务调度器会尝试并行的运行这些任务,当然任务都是有额外的开销,虽然要小于添加线程的开销;

          对Task实例的生命周期的理解非常重要。一个Task的执行,取决于底层硬件和运行时可用的资源。因此Task实例的状态会不断的发生改变,而一个Task实例只会完成其生命周期一次,当Task到达它三种可能的最终状态只后,它就回不去之前的任何状态了。

          

          Task实例有三种可能的初始状态,Created是Task构造函数创建实例的初始状态,WaitForActivation是子任务依赖其他任务完成后等待调度的初始状态,WaitingToRun是通过TaskFactory.StartNew所创建任务的初始状态。表示正在等待调度器挑选自己并运行。

          任务开始执行,状态就变为TaskStatus.Runing。如果还有子任务,主任务的状态会转变到TaskStatus.WaitingForChildrenToComplete状态。并最终到达,Canceled,Faulted和RunToCompletion 三种状态。从字面理解就是任务取消,出错和完成。

    2.任务并行。

         前面我们通过Parallel.Invoke来并行加载方法。

      Parallel.Invoke(GenerateAESKeys,GenerateMD5Has);

         通过Task实例也能完成同样的工作。

          var t1 = new Task(GenerateAESKeys);
          var t2 = new Task(GenerateMD5Has);
          t1.Start();
          t2.Start();
          Task.WaitAll(t1, t2);

     Start方法对委托进行初始化。 WaitAll方法会等待两个任务的执行完成之后再往下走。

    可以看见,执行过程中,任务的状态不断的发生变化。可以给WaitFor方法加上毫秒数。看任务是否会在指定时间内完成。

    复制代码
      if(!Task.WaitAll(new[]{t1,t2},3000))
                {
                    Console.WriteLine("任务执行超过3秒");
                    Console.WriteLine(t1.Status.ToString());
                    Console.WriteLine(t2.Status.ToString());
     }
    复制代码

    即使到达了指定时间,任务还是继续执行。

    同样任务本身也是可以等待

      if (t1.Wait(3000))
       {
        Console.WriteLine("任务t1执行超过3秒");
        Console.WriteLine(t1.Status.ToString());
        }

    3.通过取消标记取消任务。

        可以通过CancellationToken 来中断任务的执行。这需要再委托中添加一些代码,创建可以取消的任务。

    复制代码
      private static void GenerateMD5HasCancel(CancellationToken ct)
            {
                ct.ThrowIfCancellationRequested();
                var sw = Stopwatch.StartNew();
                for (int i = 0; i < NUM_AES_KEYS; i++)
                {
                    var md5M = MD5.Create();
                    byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + i);
                    byte[] result = md5M.ComputeHash(data);
                    string hexString = ConverToHexString(result);
                    ct.ThrowIfCancellationRequested();
                }
                Console.WriteLine("MD5:" + sw.Elapsed.ToString());
            }
    复制代码
    复制代码
                Console.WriteLine("任务开始...");
                var cts = new CancellationTokenSource();
                var ct = cts.Token;
                var sw = Stopwatch.StartNew();
                var t1 = Task.Factory.StartNew(() => GenerateMD5HasCancel(ct), ct);
                var t2 = Task.Factory.StartNew(() => GenerateAESKeysCancel(ct), ct);
                
                //1秒后取消任务
                Thread.Sleep(1000);
    
                cts.Cancel();
    
                try
                {
                    if (!Task.WaitAll(new[] { t1,t2}, 1000))
                    {
                        Console.WriteLine("任务执行超过1秒");
                        Console.WriteLine(t1.Status.ToString());
                    }
                }
                catch (AggregateException ex)
                {
                    foreach (var exc in ex.InnerExceptions)
                    {
                        Console.WriteLine(exc.ToString());
                    }
                    if (t1.IsCanceled)
                    {
                        Console.WriteLine("任务1取消了...");
                    }
                    Console.WriteLine(sw.Elapsed.ToString());
                    Console.WriteLine("结束");
                }
    复制代码
    CancellationTokenSource能够初始化取消的请求,而CancellationToken能将这些请求传递给异步操作;上面的方法通过Task类的Factory方法得到一个TaskFactory实例,相比Task直接创建任务,这个实例可以使用更多的功能。而StartNew 等价于用Task构造函数创建一个Task并调用Start方法执行。

    直接在Debug下面运行,程序会在异常的地方中断。直接运行exe得到上面的结果。 

    ThrowIfCancellationRequested在每一次循环迭代都会执行,内部是判断任务取消后抛出一个OperationCanceledException的异常,来避免运行不必要的循环和其他命令。

    复制代码
      public void ThrowIfCancellationRequested()
            {
                if (IsCancellationRequested) 
                    ThrowOperationCanceledException();
            }
     private void ThrowOperationCanceledException()
            {
                throw new OperationCanceledException(Environment.GetResourceString("OperationCanceled"), this);
            }
    复制代码

    如果有代码正在等待取消,还会自动抛出一个TaskCanceledException异常。会包含在AggregateException中。

    4.处理异常。

      修改上面的方法抛出一个异常。

    复制代码
     private static void GenerateMD5HasCancel(CancellationToken ct)
            {
                ct.ThrowIfCancellationRequested();
                 //....if (sw.Elapsed.TotalSeconds > 0.5)
                    {
                        throw new TimeoutException("超时异常0.5秒");
                    }
                    ct.ThrowIfCancellationRequested();
                }
                Console.WriteLine("MD5:" + sw.Elapsed.ToString());
            }
    复制代码

    修改Main方法的Catch。

    复制代码
                    if (t1.IsFaulted)
                    {
                        foreach (var exc in ex.InnerExceptions)
                        {
                            Console.WriteLine(exc.ToString());
                        }
                        Console.WriteLine(t1.Status.ToString());
                    }
    复制代码

    执行结果:

    当出现异常时,任务的状态就会转换为Faulted。并不会影响另外一个任务的执行。

    5.从任务返回值。

    前面的方法都是没有返回值,得到任务的返回值需要使用Task<TResult>实例,TResult要替换为返回的类型。修改AES方法。返回一个指定前缀的List<String>

    GenerateMD5HasList:
     View Code
    复制代码
      Console.WriteLine("任务开始...");
                var cts = new CancellationTokenSource();
                var ct = cts.Token;
                
                var t1 = Task.Factory.StartNew(() => GenerateMD5HasList(ct,'A'), ct);
                //等待执行完成
                t1.Wait();
    
                var res = t1.Result;
                for (int i = 0; i < res.Count; i++)
                {
                    Console.WriteLine(res[i]);
                }
    复制代码

    而这时的StartNew创建的类型是Task<List<String>>.StartNew源码如下:

    复制代码
      public Task<TResult> StartNew<TResult>(Func<TResult> function)
            {
                StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
                Task currTask = Task.InternalCurrent;
                return Task<TResult>.StartNew(currTask, function, m_defaultCancellationToken,
                    m_defaultCreationOptions, InternalTaskOptions.None, GetDefaultScheduler(currTask), ref stackMark);
            }
    复制代码

    我们还可以将任务串联起来。比如上面的代码。避免写太多代码来检查前面一个任务是否完成。而ContinueWith这个方法可以用来串联多个任务。

    复制代码
                var t1 = Task.Factory.StartNew(() => GenerateMD5HasList(ct,'A'), ct);
                var t2 = t1.ContinueWith((t) =>
                {
                    for (int i = 0; i < t.Result.Count; i++)
                    {
                        Console.WriteLine(t.Result[i]);
                                        }
                });
                //可以等待t2执行完成
                t2.Wait();
    复制代码

    如果需要设置继续的条件,就要用到TaskContinuationOptions,它是一个枚举类型,用来控制另一个任务执行和调度的可选行为

     var t2 = t1.ContinueWith((t) => OtherMethod(t), TaskContinuationOptions.NotOnCanceled);

     NotOnCanceled,就是表示上个任务不取消的情况下执行。例如还有NotOnFaulted.如果上一个任务抛出了异常,那么就不会执行。这里就不一一例举了。

     小结:这一章主要是将了基于任务的编程模型,学习了任务的创建、状态,以及如何取消、捕获异常和获得返回值,并能串行任务,任务的延续不仅能简化代码,而且还能帮助调度器对很快就要执行的任务采取正确的操作。下一章学习并发集合。

    阅读书籍:《C#并行编程高级教程》 链接: 下载链: http://pan.baidu.com/s/1bn1BdBx  密码: fn2d

  • 相关阅读:
    nepenthes用法
    honeydctl命令
    honeyd路由拓扑
    Linux Samba服务器的安装
    honeyd使用
    FreeRTOS 事件标志组
    epoll函数
    Java程序:从命令行接收多个数字,求和并输出结果
    《大道至简》第一章读后感
    【诗词歌赋】 杂感- 贺小妹
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4919814.html
Copyright © 2011-2022 走看看