zoukankan      html  css  js  c++  java
  • 多线程那点事—Parallel.for

    先看段代码:

    1 for (int i = 0; i < 10; i++)
    2 {
    3     Task.Factory.StartNew(()=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));
    4 }

    从代码上可以看出我们预期是打印1~10,但实际的打印结果是:

     1 7 ~ 10
     2 4 ~ 10
     3 10 ~ 10
     4 9 ~ 10
     5 4 ~ 10
     6 3 ~ 10
     7 5 ~ 10
     8 9 ~ 10
     9 6 ~ 10
    10 8 ~ 10

    与预期的不一致,我们预期是打印数字1到10,但实际打印出来的是10次10。因为这几个lambda表达式中使用了同一个变量,并且这些匿名函数共享变量值。

    再来看下面这段代码:

    1 Action<int> displayNumber = n => Console.WriteLine(n);
    2 int i = 5;
    3 Task taskOne = Task.Factory.StartNew(() => displayNumber(i));
    4 i = 7;
    5 Task taskTwo = Task.Factory.StartNew(() => displayNumber(i));
    6 Task.WaitAll(taskOne,taskTwo);

    输出结果:

    7
    7

    当闭包通过lambda表达式捕获可变变量时,lambda捕获变量的引用,而不是捕获该变量的当前值。因此,如果任务在变量的引用值更改后运行,则该值将是内存中最新的值,而不是捕获变量时的值。

    为解决该问题,我们引入Parallel类来解决问题:

    1 Parallel.For(0,10,i=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));

    打印结果:

     1 1 ~ 0
     2 1 ~ 2
     3 3 ~ 1
     4 3 ~ 4
     5 3 ~ 7
     6 3 ~ 8
     7 3 ~ 9
     8 1 ~ 3
     9 5 ~ 5
    10 4 ~ 6

    Parallel 类 提供对并行循环和区域的支持, 现在我们看下Parallel.for的代码:

      1 // this needs to be in try-block because it can throw in  BuggyScheduler.MaxConcurrencyLevel
      2                 rootTask = new ParallelForReplicatingTask(
      3                     parallelOptions,
      4                     delegate
      5                     {
      6                         //
      7                         // first thing we do upon enterying the task is to register  as a new "RangeWorker" with the
      8                         // shared RangeManager instance.
      9                         //
     10                         // If this call returns a RangeWorker struct which wraps the  state needed by this task
     11                         //
     12                         // We need to call FindNewWork32() on it to see whether  there's a chunk available.
     13                         //
     14                         // Cache some information about the current task
     15                         Task currentWorkerTask = Task.InternalCurrent;
     16                         bool bIsRootTask = (currentWorkerTask == rootTask);
     17                         RangeWorker currentWorker = new RangeWorker();
     18                         Object savedStateFromPreviousReplica =  currentWorkerTask.SavedStateFromPreviousReplica;
     19                         if (savedStateFromPreviousReplica is RangeWorker)
     20                             currentWorker =  (RangeWorker)savedStateFromPreviousReplica;
     21                         else
     22                             currentWorker = rangeManager.RegisterNewWorker();
     23                         // These are the local index values to be used in the  sequential loop.
     24                         // Their values filled in by FindNewWork32
     25                         int nFromInclusiveLocal;
     26                         int nToExclusiveLocal;
     27                         if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out  nToExclusiveLocal) == false ||
     28                             sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) ==  true)
     29                         {
     30                             return; // no need to run
     31                         }
     32                         // ETW event for ParallelFor Worker Fork
     33                         if (TplEtwProvider.Log.IsEnabled())
     34                         {
     35                             TplEtwProvider.Log.ParallelFork((currentWorkerTask != null  ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id),  (currentWorkerTask != null ? currentWorkerTask.Id : 0),
     36                                                              forkJoinContextID);
     37                         }
     38                         TLocal localValue = default(TLocal);
     39                         bool bLocalValueInitialized = false; // Tracks whether  localInit ran without exceptions, so that we can skip localFinally if it wasn't
     40                         try
     41                         {
     42                             // Create a new state object that references the shared  "stopped" and "exceptional" flags
     43                             // If needed, it will contain a new instance of  thread-local state by invoking the selector.
     44                             ParallelLoopState32 state = null;
     45                             if (bodyWithState != null)
     46                             {
     47                                 Contract.Assert(sharedPStateFlags != null);
     48                                 state = new ParallelLoopState32(sharedPStateFlags);
     49                             }
     50                             else if (bodyWithLocal != null)
     51                             {
     52                                 Contract.Assert(sharedPStateFlags != null);
     53                                 state = new ParallelLoopState32(sharedPStateFlags);
     54                                 if (localInit != null)
     55                                 {
     56                                     localValue = localInit();
     57                                     bLocalValueInitialized = true;
     58                                 }
     59                             }
     60                             // initialize a loop timer which will help us decide  whether we should exit early
     61                             LoopTimer loopTimer = new  LoopTimer(rootTask.ActiveChildCount);
     62                             // Now perform the loop itself.
     63                             do
     64                             {
     65                                 if (body != null)
     66                                 {
     67                                     for (int j = nFromInclusiveLocal;
     68                                          j < nToExclusiveLocal &&  (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE  // fast path  check as SEL() doesn't inline
     69                                                                    ||  !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have  no state
     70                                          j += 1)
     71                                     {
     72                                         body(j);
     73                                     }
     74                                 }
     75                                 else if (bodyWithState != null)
     76                                 {
     77                                     for (int j = nFromInclusiveLocal;
     78                                         j < nToExclusiveLocal &&  (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE  // fast path  check as SEL() doesn't inline
     79                                                                    ||  !sharedPStateFlags.ShouldExitLoop(j));
     80                                         j += 1)
     81                                     {
     82                                         state.CurrentIteration = j;
     83                                         bodyWithState(j, state);
     84                                     }
     85                                 }
     86                                 else
     87                                 {
     88                                     for (int j = nFromInclusiveLocal;
     89                                         j < nToExclusiveLocal &&  (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE  // fast path  check as SEL() doesn't inline
     90                                                                    ||  !sharedPStateFlags.ShouldExitLoop(j));
     91                                         j += 1)
     92                                     {
     93                                         state.CurrentIteration = j;
     94                                         localValue = bodyWithLocal(j, state,  localValue);
     95                                     }
     96                                 }
     97                                 // Cooperative multitasking hack for AppDomain  fairness.
     98                                 // Check if allowed loop time is exceeded, if so save  current state and return. The self replicating task logic
     99                                 // will detect this, and queue up a replacement task.  Note that we don't do this on the root task.
    100                                 if (!bIsRootTask && loopTimer.LimitExceeded())
    101                                 {
    102                                     currentWorkerTask.SavedStateForNextReplica =  (object)currentWorker;
    103                                     break;
    104                                 }
    105                             }
    106                             // Exit if we can't find new work, or if the loop was  stoppped.
    107                             while (currentWorker.FindNewWork32(out  nFromInclusiveLocal, out nToExclusiveLocal) &&
    108                                     ((sharedPStateFlags.LoopStateFlags ==  ParallelLoopStateFlags.PLS_NONE) ||
    109                                        !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
    110                         }
    111                         catch
    112                         {
    113                             // if we catch an exception in a worker, we signal the  other workers to exit the loop, and we rethrow
    114                             sharedPStateFlags.SetExceptional();
    115                             throw;
    116                         }
    117                         finally
    118                         {
    119                             // If a cleanup function was specified, call it.  Otherwise, if the type is
    120                             // IDisposable, we will invoke Dispose on behalf of the  user.
    121                             if (localFinally != null && bLocalValueInitialized)
    122                             {
    123                                 localFinally(localValue);
    124                             }
    125                             // ETW event for ParallelFor Worker Join
    126                             if (TplEtwProvider.Log.IsEnabled())
    127                             {
    128                                 TplEtwProvider.Log.ParallelJoin((currentWorkerTask !=  null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id),  (currentWorkerTask != null ? currentWorkerTask.Id : 0),
    129                                                                  forkJoinContextID);
    130                             }
    131                         }
    132                     },
    133                     creationOptions, internalOptions);
    134                 rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);  // might throw TSE
    135                 rootTask.Wait();
    136                 // If we made a cancellation registration, we need to clean it up  now before observing the OCE
    137                 // Otherwise we could be caught in the middle of a callback, and  observe PLS_STOPPED, but oce = null
    138                 if (parallelOptions.CancellationToken.CanBeCanceled)
    139                 {
    140                     ctr.Dispose();
    141                 }
    142                 // If we got through that with no exceptions, and we were canceled,  then
    143                 // throw our cancellation exception
    144                 if (oce != null) throw oce;

    body对于迭代范围 (的每个值调用一次委托 fromInclusive , toExclusive) 。提供两个参数:

    1、一个 Int32 值,该值表示迭代次数。

    2、ParallelLoopState可用于提前中断循环的实例。ParallelLoopState对象是由编译器创建的; 它不能在用户代码中实例化。

    继续来看:

    Parallel.For(0, 10, (i,state) =>
                {
                    if (i > 5)
                        state.Break();
                    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}");
                } );

    输出:

    1 1 ~ 0
    2 1 ~ 1
    3 1 ~ 2
    4 1 ~ 3
    5 1 ~ 4
    6 1 ~ 5
    7 1 ~ 6

    在上面的方法中我们使用了 break方法。

    调用 Break 方法会通知 for 操作,在当前的迭代之后,无需执行迭代。不过,如果所有迭代尚未执行,则仍必须执行当前的所有迭代。

    因此,调用 Break 类似于 for c# 等语言中的传统循环内的中断操作,但它并不是完美的替代方法:例如,无法保证当前的迭代不会执行。

    今天就先写道这里~


  • 相关阅读:
    hdu_1072_Nightmare(BFS)
    hdu_4826_Labyrinth_2014百度之星(dp)
    hdu_4823_Energy Conversion
    hdu_3063_Play game
    hdu_3062_Party(2-SAT)
    5、1 部署
    klayge 4.2.0 编译vc9
    数据延迟加载
    指令 scope
    指令 作用域
  • 原文地址:https://www.cnblogs.com/xtt321/p/14223636.html
Copyright © 2011-2022 走看看