zoukankan      html  css  js  c++  java
  • 看看Parallel中高度封装的三个方法,Invoke,For和ForEach

      说到.net中的并行编程,也许你的第一反应就是Task,确实Task是一个非常灵活的用于并行编程的一个专用类,不可否认越灵活的东西用起来就越

    复杂,高度封装的东西用起来很简单,但是缺失了灵活性,这篇我们就看看这些好用但灵活性不高的几个并行方法。

    一:Invoke

      现在电子商务的网站都少不了订单的流程,没有订单的话网站也就没有存活的价值了,往往在订单提交成功后,通常会有这两个操作,第一个:发起

    信用卡扣款,第二个:发送emial确认单,这两个操作我们就可以在下单接口调用成功后,因为两个方法是互不干扰的,所以就可以用invoke来玩玩了。

     1         static void Main(string[] args)
     2         {
     3             Parallel.Invoke(Credit, Email);
     4 
     5             Console.Read();
     6         }
     7 
     8         static void Credit()
     9         {
    10             Console.WriteLine("******************  发起信用卡扣款中  ******************");
    11 
    12             Thread.Sleep(2000);
    13 
    14             Console.WriteLine("扣款成功!");
    15         }
    16 
    17         static void Email()
    18         {
    19             Console.WriteLine("******************  发送邮件确认单!*****************");
    20 
    21             Thread.Sleep(3000);
    22 
    23             Console.WriteLine("email发送成功!");
    24         }

     

      怎么样,实现起来是不是很简单,只要把你需要的方法塞给invoke就行了,不过在这个方法里面有一个重载参数需要注意下,

    1  public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);

    有时候我们的线程可能会跑遍所有的内核,为了提高其他应用程序的稳定性,就要限制参与的内核,正好ParallelOptions提供了

    MaxDegreeOfParallelism属性。

    好了,下面我们大概翻翻invoke里面的代码实现,发现有几个好玩的地方:

    <1>: 当invoke中的方法超过10个话,我们发现它走了一个internal可见的ParallelForReplicatingTask的FCL内部专用类,而这个类是继承自

       Task的,当方法少于10个的话,才会走常规的Task.

    <2> 居然发现了一个装exception 的ConcurrentQueue<Exception>队列集合,多个异常入队后,再包装成AggregateException抛出来。

           比如:throw new AggregateException(exceptionQ);

    <3> 我们发现,不管是超过10个还是小于10个,都是通过WaitAll来等待所有的执行,所以缺点就在这个地方,如果某一个方法执行时间太长

       不能退出,那么这个方法是不是会长期挂在这里不能出来,也就导致了主流程一直挂起,然后页面就一直挂起,所以这个是一个非常危险

          的行为,如果我们用task中就可以在waitall中设置一个过期时间,但invoke却没法做到,所以在使用invoke的时候要慎重考虑。

      1     try
      2     {
      3         if (actionsCopy.Length > 10 || (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length))
      4         {
      5             ConcurrentQueue<Exception> exceptionQ = null;
      6             try
      7             {
      8                 int actionIndex = 0;
      9                 ParallelForReplicatingTask parallelForReplicatingTask = new ParallelForReplicatingTask(parallelOptions, delegate
     10                 {
     11                     for (int l = Interlocked.Increment(ref actionIndex); l <= actionsCopy.Length; l = Interlocked.Increment(ref actionIndex))
     12                     {
     13                         try
     14                         {
     15                             actionsCopy[l - 1]();
     16                         }
     17                         catch (Exception item)
     18                         {
     19                             LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => new ConcurrentQueue<Exception>());
     20                             exceptionQ.Enqueue(item);
     21                         }
     22                         if (parallelOptions.CancellationToken.IsCancellationRequested)
     23                         {
     24                             throw new OperationCanceledException(parallelOptions.CancellationToken);
     25                         }
     26                     }
     27                 }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating);
     28                 parallelForReplicatingTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);
     29                 parallelForReplicatingTask.Wait();
     30             }
     31             catch (Exception ex2)
     32             {
     33                 LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => new ConcurrentQueue<Exception>());
     34                 AggregateException ex = ex2 as AggregateException;
     35                 if (ex != null)
     36                 {
     37                     using (IEnumerator<Exception> enumerator = ex.InnerExceptions.GetEnumerator())
     38                     {
     39                         while (enumerator.MoveNext())
     40                         {
     41                             Exception current = enumerator.Current;
     42                             exceptionQ.Enqueue(current);
     43                         }
     44                         goto IL_264;
     45                     }
     46                 }
     47                 exceptionQ.Enqueue(ex2);
     48                 IL_264:;
     49             }
     50             if (exceptionQ != null && exceptionQ.Count > 0)
     51             {
     52                 Parallel.ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken);
     53                 throw new AggregateException(exceptionQ);
     54             }
     55         }
     56         else
     57         {
     58             Task[] array = new Task[actionsCopy.Length];
     59             if (parallelOptions.CancellationToken.IsCancellationRequested)
     60             {
     61                 throw new OperationCanceledException(parallelOptions.CancellationToken);
     62             }
     63             for (int j = 0; j < array.Length; j++)
     64             {
     65                 array[j] = Task.Factory.StartNew(actionsCopy[j], parallelOptions.CancellationToken, TaskCreationOptions.None, InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler);
     66             }
     67             try
     68             {
     69                 if (array.Length <= 4)
     70                 {
     71                     Task.FastWaitAll(array);
     72                 }
     73                 else
     74                 {
     75                     Task.WaitAll(array);
     76                 }
     77             }
     78             catch (AggregateException ex3)
     79             {
     80                 Parallel.ThrowIfReducableToSingleOCE(ex3.InnerExceptions, parallelOptions.CancellationToken);
     81                 throw;
     82             }
     83             finally
     84             {
     85                 for (int k = 0; k < array.Length; k++)
     86                 {
     87                     if (array[k].IsCompleted)
     88                     {
     89                         array[k].Dispose();
     90                     }
     91                 }
     92             }
     93         }
     94     }
     95     finally
     96     {
     97         if (TplEtwProvider.Log.IsEnabled())
     98         {
     99             TplEtwProvider.Log.ParallelInvokeEnd((task != null) ? task.m_taskScheduler.Id : TaskScheduler.Current.Id, (task != null) ? task.Id : 0, forkJoinContextID);
    100         }
    101     }

    二:For

       下面再看看Parallel.For,我们知道普通的For是一个串行操作,如果说你的for中每条流程都需要执行一个方法,并且这些方法可以并行操作且

    比较耗时,那么为何不尝试用Parallel.For呢,就比如下面的代码。

     1 class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             List<Action> actions = new List<Action>() { Credit, Email };
     6 
     7             var result = Parallel.For(0, actions.Count, (i) =>
     8             {
     9                 actions[i]();
    10             });
    11 
    12             Console.WriteLine("执行状态:" + result.IsCompleted);
    13 
    14             Console.Read();
    15         }
    16 
    17         static void Credit()
    18         {
    19             Console.WriteLine("******************  发起信用卡扣款中  ******************");
    20 
    21             Thread.Sleep(2000);
    22 
    23             Console.WriteLine("扣款成功!");
    24         }
    25 
    26         static void Email()
    27         {
    28             Console.WriteLine("******************  发送邮件确认单!*****************");
    29 
    30             Thread.Sleep(3000);
    31 
    32             Console.WriteLine("email发送成功!");
    33         }
    34     }


    下面我们再看看Parallel.For中的最简单的重载和最复杂的重载:

    1 public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body);
    2 
    3 public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);
    4  

    <1> 简单的重载不必多说,很简单,我上面的例子也演示了。

    <2> 最复杂的这种重载提供了一个AOP的功能,在每一个body的action执行之前会先执行localInit这个action,在body之后还会执行localFinally

           这个action,有没有感觉到已经把body切成了三块?好了,下面看一个例子。

     1     static void Main(string[] args)
     2         {
     3             var list = new List<int>() { 10, 20, 30, 40 };
     4 
     5             var options = new ParallelOptions();
     6 
     7             var total = 0;
     8 
     9             var result = Parallel.For(0, list.Count, () =>
    10             {
    11                 Console.WriteLine("------------  thead --------------");
    12 
    13                 return 1;
    14             },
    15               (i, loop, j) =>
    16               {
    17                   Console.WriteLine("------------  body --------------");
    18 
    19                   Console.WriteLine("i=" + list[i] + " j=" + j);
    20 
    21                   return list[i];
    22               },
    23               (i) =>
    24               {
    25                   Console.WriteLine("------------  tfoot --------------");
    26 
    27                   Interlocked.Add(ref total, i);
    28 
    29                   Console.WriteLine("total=" + total);
    30               });
    31 
    32             Console.WriteLine("iscompleted:" + result.IsCompleted);
    33             Console.Read();
    34         }
    View Code

    接下来我们再翻翻它的源代码,由于源码太多,里面神乎其神,我就找几个好玩的地方。

    <1>  我在里面找到了一个rangeManager分区函数,代码复杂看不懂,貌似很强大。

     1         internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
     2         {
     3             this.m_nCurrentIndexRangeToAssign = 0;
     4             this.m_nStep = nStep;
     5             if (nNumExpectedWorkers == 1)
     6             {
     7                 nNumExpectedWorkers = 2;
     8             }
     9             ulong num = (ulong)(nToExclusive - nFromInclusive);
    10             ulong num2 = num / (ulong)((long)nNumExpectedWorkers);
    11             num2 -= num2 % (ulong)nStep;
    12             if (num2 == 0uL)
    13             {
    14                 num2 = (ulong)nStep;
    15             }
    16             int num3 = (int)(num / num2);
    17             if (num % num2 != 0uL)
    18             {
    19                 num3++;
    20             }
    21             long num4 = (long)num2;
    22             this.m_indexRanges = new IndexRange[num3];
    23             long num5 = nFromInclusive;
    24             for (int i = 0; i < num3; i++)
    25             {
    26                 this.m_indexRanges[i].m_nFromInclusive = num5;
    27                 this.m_indexRanges[i].m_nSharedCurrentIndexOffset = null;
    28                 this.m_indexRanges[i].m_bRangeFinished = 0;
    29                 num5 += num4;
    30                 if (num5 < num5 - num4 || num5 > nToExclusive)
    31                 {
    32                     num5 = nToExclusive;
    33                 }
    34                 this.m_indexRanges[i].m_nToExclusive = num5;
    35             }
    36         }

    <2> 我又找到了这个神奇的ParallelForReplicatingTask类。

    那么下面问题来了,在单线程的for中,我可以continue,可以break,那么在Parallel.For中有吗?因为是并行,所以continue基本上就没有

    存在价值,break的话确实有价值,这个就是委托中的ParallelLoopState做到的,并且还新增了一个Stop。

     

    三:ForEach

    其实ForEach和for在本质上是一样的,你在源代码中会发现在底层都是调用一个方法的,而ForEach会在底层中调用for共同的函数之前还会执行

    其他的一些逻辑,所以这就告诉我们,能用Parallel.For的地方就不要用Parallel.ForEach,其他的都一样了,这里就不赘述了。

  • 相关阅读:
    C++-POJ1020-Anniversary Cake[搜索][dfs]
    C++-POJ1988-Cube Stacking[数据结构][并查集]
    大佬的代码
    C++-POJ3349-Snowflake Snow Snowflakes[STL][set][hash未写]
    C++-POJ3274-Gold Balanced Lineup[hash]
    ListView 在设备切换横竖屏时保存状态
    Android Studio 常见命令
    android textView 总是有paddingtop怎么解决
    ionic build Android错误记录 error in opening zip file
    git grep 或者 ag 进行快速代码搜索
  • 原文地址:https://www.cnblogs.com/huangxincheng/p/4067967.html
Copyright © 2011-2022 走看看