zoukankan      html  css  js  c++  java
  • C# Task 是什么?返回值如何实现? Wait如何实现

    关于Task的API太多了,网上的实例也很多,现在我们来说说Task究竟是个什么东西【task一般用于多线程,它一定与线程有关】,还有它的返回值有事怎么搞的。

    首先我们以一个最简单的API开始,TaskFactory的StartNew<TResult>方法,TaskFactory.cs

    public Task<TResult> StartNew<TResult>(Func<Object, TResult> function, Object state)
    {
        StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
        Task currTask = Task.InternalCurrent;
        return Task<TResult>.StartNew(currTask, function, state, m_defaultCancellationToken,
            m_defaultCreationOptions, InternalTaskOptions.None, GetDefaultScheduler(currTask), ref stackMark);
    }
    private TaskScheduler GetDefaultScheduler(Task currTask)
    {
        if (m_defaultScheduler != null) return m_defaultScheduler;
        else if ((currTask != null)&& ((currTask.CreationOptions & TaskCreationOptions.HideScheduler) == 0))
            return currTask.ExecutingTaskScheduler;
        else return TaskScheduler.Default;
    }

    可见最终和调用Task<TResult>.StartNew等效的,这里的GetDefaultScheduler返回的是TaskScheduler.Default。TaskScheduler.cs实现如下:

    public abstract class TaskScheduler
    {
       private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler();
        public static TaskScheduler Default 
        {
            get
            {
                return s_defaultTaskScheduler;
            }
        }
        internal void InternalQueueTask(Task task)
        {
            Contract.Requires(task != null);
            task.FireTaskScheduledIfNeeded(this);
            this.QueueTask(task);
        }
    }

    默认的TaskScheduler.Default是ThreadPoolTaskScheduler实例。现在我们看看Task<TResult>的实现 Future.cs

     public class Task<TResult> : Task
     {
       internal static Task<TResult> StartNew(Task parent, Func<object, TResult> function, object state, CancellationToken cancellationToken,
                TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler, ref StackCrawlMark stackMark)
        {
            if (function == null)
            {
                throw new ArgumentNullException("function");
            }
            if (scheduler == null)
            {
                throw new ArgumentNullException("scheduler");
            }
            if ((internalOptions & InternalTaskOptions.SelfReplicating) != 0)
            {
                throw new ArgumentOutOfRangeException("creationOptions", Environment.GetResourceString("TaskT_ctor_SelfReplicating"));
            }
    
            // Create and schedule the future.
            Task<TResult> f = new Task<TResult>(function, state, parent, cancellationToken, creationOptions, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler, ref stackMark);
    
            f.ScheduleAndStart(false);
            return f;
        }
       internal void ScheduleAndStart(bool needsProtection)
        {
            Contract.Assert(m_taskScheduler != null, "expected a task scheduler to have been selected");
            Contract.Assert((m_stateFlags & TASK_STATE_STARTED) == 0, "task has already started");
    
            // Set the TASK_STATE_STARTED bit
            if (needsProtection)
            {
                if (!MarkStarted())
                {
                    // A cancel has snuck in before we could get started.  Quietly exit.
                    return;
                }
            }
            else
            {
                m_stateFlags |= TASK_STATE_STARTED;
            }
    
            if (s_asyncDebuggingEnabled)
            {
                AddToActiveTasks(this);
            }
    
            if (AsyncCausalityTracer.LoggingOn && (Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
            {
                //For all other task than TaskContinuations we want to log. TaskContinuations log in their constructor
                AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "Task: "+((Delegate)m_action).Method.Name, 0);
            }
            try
            {
                m_taskScheduler.InternalQueueTask(this);
            }
            catch (ThreadAbortException tae)
            {
                AddException(tae);
                FinishThreadAbortedTask(true, false);
            }
            catch (Exception e)
            {
                TaskSchedulerException tse = new TaskSchedulerException(e);
                AddException(tse);
                Finish(false);
                if ((Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
                {
                    // m_contingentProperties.m_exceptionsHolder *should* already exist after AddException()
                    Contract.Assert(
                        (m_contingentProperties != null) &&
                        (m_contingentProperties.m_exceptionsHolder != null) &&
                        (m_contingentProperties.m_exceptionsHolder.ContainsFaultList),
                            "Task.ScheduleAndStart(): Expected m_contingentProperties.m_exceptionsHolder to exist " +
                            "and to have faults recorded.");
    
                    m_contingentProperties.m_exceptionsHolder.MarkAsHandled(false);
                }
                // re-throw the exception wrapped as a TaskSchedulerException.
                throw tse;
            }
        }
      
            
      internal Task(
            Func<object, TResult> valueSelector, object state, Task parent, CancellationToken cancellationToken,
            TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler, ref StackCrawlMark stackMark) :
            this(valueSelector, state, parent, cancellationToken, creationOptions, internalOptions, scheduler)
        {
            PossiblyCaptureContext(ref stackMark);
        }
        internal void PossiblyCaptureContext(ref StackCrawlMark stackMark)
        {
            Contract.Assert(m_contingentProperties == null || m_contingentProperties.m_capturedContext == null, "Captured an ExecutionContext when one was already captured.");
            CapturedContext = ExecutionContext.Capture(ref stackMark, ExecutionContext.CaptureOptions.IgnoreSyncCtx | ExecutionContext.CaptureOptions.OptimizeDefaultCase);
        }
        internal override void InnerInvoke()
        {
            // Invoke the delegate
            Contract.Assert(m_action != null);
            var func = m_action as Func<TResult>;
            if (func != null)
            {
                m_result = func();
                return;
            }
            var funcWithState = m_action as Func<object, TResult>;
            if (funcWithState != null)
            {
                m_result = funcWithState(m_stateObject);
                return;
            }
            Contract.Assert(false, "Invalid m_action in Task<TResult>");
        }
    
        public TResult Result
        {
            get { return IsWaitNotificationEnabledOrNotRanToCompletion ? GetResultCore(waitCompletionNotification: true) : m_result; }
        }
       
     }

    Task<TResult>的StartNew方法首先调用构造函数,在构造函数里面调用PossiblyCaptureContext方法,PossiblyCaptureContext方法调用ExecutionContext.Capture捕获线程上下文,然后回到StartNew方法,调用Task<TResult>。ScheduleAndStart,ScheduleAndStart方法主要是调用TaskScheduler的InternalQueueTask方法TaskScheduler的InternalQueueTask方法主要是调用QueueTask,QueueTask方法在子类被覆盖,这里调用ThreadPoolTaskScheduler的QueueTask方法,ThreadPoolTaskScheduler.cs

    internal sealed class ThreadPoolTaskScheduler: TaskScheduler
    {
       private static readonly ParameterizedThreadStart s_longRunningThreadWork = new ParameterizedThreadStart(LongRunningThreadWork);
    
        private static void LongRunningThreadWork(object obj)
        {
            Contract.Requires(obj != null, "TaskScheduler.LongRunningThreadWork: obj is null");
            Task t = obj as Task;
            Contract.Assert(t != null, "TaskScheduler.LongRunningThreadWork: t is null");
            t.ExecuteEntry(false);
        }
        protected internal override void QueueTask(Task task)
        {
            if ((task.Options & TaskCreationOptions.LongRunning) != 0)
            {
                // Run LongRunning tasks on their own dedicated thread.
                Thread thread = new Thread(s_longRunningThreadWork);
                thread.IsBackground = true; // Keep this thread from blocking process shutdown
                thread.Start(task);
            }
            else
            {
                // Normal handling for non-LongRunning tasks.
                bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
                ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
            }
        }
    }

    如果Task是TaskCreationOptions.LongRunning,那么我们新建一个逻辑线程来运行当前的Task,否者放到线程池里面运行

    单独的逻辑线程调用s_longRunningThreadWork回调方法【该方法调用task的ExecuteEntry】;如果是线程池那么我们调用Task的ExecuteWorkItem方法【其实还是调用Task的ExecuteEntry】,Task.cs

    public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
    {
        [ThreadStatic]
        internal static Task t_currentTask; 
        internal static Task InternalCurrent
        {
            get { return t_currentTask; }
        }
       void IThreadPoolWorkItem.ExecuteWorkItem()
        {
            ExecuteEntry(false);
        }
        internal bool ExecuteEntry(bool bPreventDoubleExecution)
        {
            if (bPreventDoubleExecution || ((Options & (TaskCreationOptions)InternalTaskOptions.SelfReplicating) != 0))
            {
                int previousState = 0;
    
                // Do atomic state transition from queued to invoked. If we observe a task that's already invoked,
                // we will return false so that TaskScheduler.ExecuteTask can throw an exception back to the custom scheduler.
                // However we don't want this exception to be throw if the task was already canceled, because it's a
                // legitimate scenario for custom schedulers to dequeue a task and mark it as canceled (example: throttling scheduler)
                if (!AtomicStateUpdate(TASK_STATE_DELEGATE_INVOKED,
                                       TASK_STATE_DELEGATE_INVOKED | TASK_STATE_COMPLETED_MASK,
                                       ref previousState) && (previousState & TASK_STATE_CANCELED) == 0)
                {
                    // This task has already been invoked.  Don't invoke it again.
                    return false;
                }
            }
            else
            {
                // Remember that we started running the task delegate.
                m_stateFlags |= TASK_STATE_DELEGATE_INVOKED;
            }
    
            if (!IsCancellationRequested && !IsCanceled)
            {
                ExecuteWithThreadLocal(ref t_currentTask);
            }
            else if (!IsCanceled)
            {
                int prevState = Interlocked.Exchange(ref m_stateFlags, m_stateFlags | TASK_STATE_CANCELED);
                if ((prevState & TASK_STATE_CANCELED) == 0)
                {
                    CancellationCleanupLogic();
                }
            }
    
            return true;
        }
       private void ExecuteWithThreadLocal(ref Task currentTaskSlot)
        {
            // Remember the current task so we can restore it after running, and then
            Task previousTask = currentTaskSlot;
    
            // ETW event for Task Started
            var etwLog = TplEtwProvider.Log;
            Guid savedActivityID = new Guid();
            bool etwIsEnabled = etwLog.IsEnabled();
            if (etwIsEnabled)
            {
                if (etwLog.TasksSetActivityIds)
                    EventSource.SetCurrentThreadActivityId(TplEtwProvider.CreateGuidForTaskID(this.Id), out savedActivityID);
                // previousTask holds the actual "current task" we want to report in the event
                if (previousTask != null)
                    etwLog.TaskStarted(previousTask.m_taskScheduler.Id, previousTask.Id, this.Id);
                else
                    etwLog.TaskStarted(TaskScheduler.Current.Id, 0, this.Id);
            }
    
            if (AsyncCausalityTracer.LoggingOn)
                AsyncCausalityTracer.TraceSynchronousWorkStart(CausalityTraceLevel.Required, this.Id, CausalitySynchronousWork.Execution);
    
    
            try
            {
                // place the current task into TLS.
                currentTaskSlot = this;
    
                ExecutionContext ec = CapturedContext;
                if (ec == null)
                {
                    // No context, just run the task directly.
                    Execute();
                }
                else
                {
                    if (IsSelfReplicatingRoot || IsChildReplica)
                    {
                        CapturedContext = CopyExecutionContext(ec);
                    }
    
                    // Run the task.  We need a simple shim that converts the
                    // object back into a Task object, so that we can Execute it.
    
                    // Lazily initialize the callback delegate; benign ----
                    var callback = s_ecCallback;
                    if (callback == null) s_ecCallback = callback = new ContextCallback(ExecutionContextCallback);
    #if PFX_LEGACY_3_5
                    ExecutionContext.Run(ec, callback, this);
    #else
                    ExecutionContext.Run(ec, callback, this, true);
    #endif
                }
    
                if (AsyncCausalityTracer.LoggingOn)
                    AsyncCausalityTracer.TraceSynchronousWorkCompletion(CausalityTraceLevel.Required, CausalitySynchronousWork.Execution);
    
                Finish(true);
            }
            finally
            {
                currentTaskSlot = previousTask;
                
                // ETW event for Task Completed
                if (etwIsEnabled)
                {
                    // previousTask holds the actual "current task" we want to report in the event
                    if (previousTask != null)
                        etwLog.TaskCompleted(previousTask.m_taskScheduler.Id, previousTask.Id, this.Id, IsFaulted);
                    else
                        etwLog.TaskCompleted(TaskScheduler.Current.Id, 0, this.Id, IsFaulted);
    
                    if (etwLog.TasksSetActivityIds)
                        EventSource.SetCurrentThreadActivityId(savedActivityID);
                }
            }
        }
       private static void ExecutionContextCallback(object obj)
        {
            Task task = obj as Task;
            Contract.Assert(task != null, "expected a task object");
            task.Execute();
        }
        private void Execute()
        {
            if (IsSelfReplicatingRoot)
            {
                ExecuteSelfReplicating(this);
            }
            else
            {
                try
                {
                    InnerInvoke();
                }
                catch (ThreadAbortException tae)
                {
                    // Don't record the TAE or call FinishThreadAbortedTask for a child replica task --
                    // it's already been done downstream.
                    if (!IsChildReplica)
                    {
                        // Record this exception in the task's exception list
                        HandleException(tae);
    
                        // This is a ThreadAbortException and it will be rethrown from this catch clause, causing us to 
                        // skip the regular Finish codepath. In order not to leave the task unfinished, we now call 
                        // FinishThreadAbortedTask here.
                        FinishThreadAbortedTask(true, true);
                    }
                }
                catch (Exception exn)
                {
                    // Record this exception in the task's exception list
                    HandleException(exn);
                }
            }
        }
    }

    到这里Task是什么 就明白了,继续往下看,Task的ExecuteEntry方法主要是调用ExecutionContext.Run(ec, callback, this, true)方法,注意这里的callback是一个ContextCallback实例,里面主要是调用Task的Execute方法,Task的Execute主要是代用InnerInvoke方法,该方法在子类Task<TResult>被重写,InnerInvoke方法里面有2局比较重要【m_result = func() 或者m_result = funcWithState(m_stateObject) 这里才是正真调用我们自己的方法,给返回值赋值】,到这里返回值也就明白了。那么在我们方法结束后需要调用Finish方法来标记方法结束

    /// <summary>
        /// Signals completion of this particular task.
        /// The bUserDelegateExecuted parameter indicates whether this Finish() call comes following the
        /// full execution of the user delegate. 
        /// If bUserDelegateExecuted is false, it mean user delegate wasn't invoked at all (either due to
        /// a cancellation request, or because this task is a promise style Task). In this case, the steps
        /// involving child tasks (i.e. WaitForChildren) will be skipped.
        /// 
        /// </summary>
        internal void Finish(bool bUserDelegateExecuted)
        {
            if (!bUserDelegateExecuted)
            {
                // delegate didn't execute => no children. We can safely call the remaining finish stages
                FinishStageTwo();
            }
            else
            {
                var props = m_contingentProperties;
    
                if (props == null || // no contingent properties means no children, so it's safe to complete ourselves
                    (props.m_completionCountdown == 1 && !IsSelfReplicatingRoot) ||
                    // Count of 1 => either all children finished, or there were none. Safe to complete ourselves 
                    // without paying the price of an Interlocked.Decrement.
                    // However we need to exclude self replicating root tasks from this optimization, because
                    // they can have children joining in, or finishing even after the root task delegate is done.
                    Interlocked.Decrement(ref props.m_completionCountdown) == 0) // Reaching this sub clause means there may be remaining active children,
                // and we could be racing with one of them to call FinishStageTwo().
                // So whoever does the final Interlocked.Dec is responsible to finish.
                {
                    FinishStageTwo();
                }
                else
                {
                    // Apparently some children still remain. It will be up to the last one to process the completion of this task on their own thread.
                    // We will now yield the thread back to ThreadPool. Mark our state appropriately before getting out.
    
                    // We have to use an atomic update for this and make sure not to overwrite a final state, 
                    // because at this very moment the last child's thread may be concurrently completing us.
                    // Otherwise we risk overwriting the TASK_STATE_RAN_TO_COMPLETION, _CANCELED or _FAULTED bit which may have been set by that child task.
                    // Note that the concurrent update by the last child happening in FinishStageTwo could still wipe out the TASK_STATE_WAITING_ON_CHILDREN flag, 
                    // but it is not critical to maintain, therefore we dont' need to intruduce a full atomic update into FinishStageTwo
    
                    AtomicStateUpdate(TASK_STATE_WAITING_ON_CHILDREN, TASK_STATE_FAULTED | TASK_STATE_CANCELED | TASK_STATE_RAN_TO_COMPLETION);
                }
    
                // Now is the time to prune exceptional children. We'll walk the list and removes the ones whose exceptions we might have observed after they threw.
                // we use a local variable for exceptional children here because some other thread may be nulling out m_contingentProperties.m_exceptionalChildren 
                List<Task> exceptionalChildren = props != null ? props.m_exceptionalChildren : null;
    
                if (exceptionalChildren != null)
                {
                    lock (exceptionalChildren)
                    {
                        exceptionalChildren.RemoveAll(s_IsExceptionObservedByParentPredicate); // RemoveAll has better performance than doing it ourselves
                    }
                }
            }
        }
        internal void FinishStageTwo()
        {
            AddExceptionsFromChildren();
            // At this point, the task is done executing and waiting for its children,
            // we can transition our task to a completion state.  
            int completionState;
            if (ExceptionRecorded)
            {
                completionState = TASK_STATE_FAULTED;
                if (AsyncCausalityTracer.LoggingOn)
                    AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Error);
    
                if (Task.s_asyncDebuggingEnabled)
                {
                    RemoveFromActiveTasks(this.Id);
                }
            }
            else if (IsCancellationRequested && IsCancellationAcknowledged)
            {
                // We transition into the TASK_STATE_CANCELED final state if the task's CT was signalled for cancellation, 
                // and the user delegate acknowledged the cancellation request by throwing an OCE, 
                // and the task hasn't otherwise transitioned into faulted state. (TASK_STATE_FAULTED trumps TASK_STATE_CANCELED)
                //
                // If the task threw an OCE without cancellation being requestsed (while the CT not being in signaled state),
                // then we regard it as a regular exception
    
                completionState = TASK_STATE_CANCELED;
                if (AsyncCausalityTracer.LoggingOn)
                    AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Canceled);
    
                if (Task.s_asyncDebuggingEnabled)
                {
                    RemoveFromActiveTasks(this.Id);
                }
            }
            else
            {
                completionState = TASK_STATE_RAN_TO_COMPLETION;
                if (AsyncCausalityTracer.LoggingOn)
                    AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Completed);
    
                if (Task.s_asyncDebuggingEnabled)
                {
                    RemoveFromActiveTasks(this.Id);
                }
            }
    
            // Use Interlocked.Exchange() to effect a memory fence, preventing
            // any SetCompleted() (or later) instructions from sneak back before it.
            Interlocked.Exchange(ref m_stateFlags, m_stateFlags | completionState);
    
            // Set the completion event if it's been lazy allocated.
            // And if we made a cancellation registration, it's now unnecessary.
            var cp = m_contingentProperties;
            if (cp != null)
            {
                cp.SetCompleted();
                cp.DeregisterCancellationCallback();
            }
    
            // ready to run continuations and notify parent.
            FinishStageThree();
        }
    
        internal bool AtomicStateUpdate(int newBits, int illegalBits)
        {
            // This could be implemented in terms of:
            //     internal bool AtomicStateUpdate(int newBits, int illegalBits, ref int oldFlags);
            // but for high-throughput perf, that delegation's cost is noticeable.
            SpinWait sw = new SpinWait();
            do
            {
                int oldFlags = m_stateFlags;
                if ((oldFlags & illegalBits) != 0) return false;
                if (Interlocked.CompareExchange(ref m_stateFlags, oldFlags | newBits, oldFlags) == oldFlags)
                {
                    return true;
                }
                sw.SpinOnce();
            } while (true);
        }

    在FinishStageTwo方法后面还会调用FinishStageThree方法,FinishStageThree方法调用FinishContinuations,在FinishContinuations方法里面会获取object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);然后再用对应的方法.

    internal void FinishStageThree()
    {
        // Release the action so that holding this task object alive doesn't also
        // hold alive the body of the task.  We do this before notifying a parent,
        // so that if notifying the parent completes the parent and causes
        // its synchronous continuations to run, the GC can collect the state
        // in the interim.  And we do it before finishing continuations, because
        // continuations hold onto the task, and therefore are keeping it alive.
        m_action = null;
    
        // Notify parent if this was an attached task
        if (m_parent != null
             && ((m_parent.CreationOptions & TaskCreationOptions.DenyChildAttach) == 0)
             && (((TaskCreationOptions)(m_stateFlags & OptionsMask)) & TaskCreationOptions.AttachedToParent) != 0)
        {
            m_parent.ProcessChildCompletion(this);
        }
    
        // Activate continuations (if any).
        FinishContinuations();
    }
    internal void FinishContinuations()
    {
        // Atomically store the fact that this task is completing.  From this point on, the adding of continuations will
        // result in the continuations being run/launched directly rather than being added to the continuation list.
        object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);
        TplEtwProvider.Log.RunningContinuation(Id, continuationObject);
    
        // If continuationObject == null, then we don't have any continuations to process
        if (continuationObject != null)
        {
    
            if (AsyncCausalityTracer.LoggingOn)
                AsyncCausalityTracer.TraceSynchronousWorkStart(CausalityTraceLevel.Required, this.Id, CausalitySynchronousWork.CompletionNotification);
    
            // Skip synchronous execution of continuations if this task's thread was aborted
            bool bCanInlineContinuations = !(((m_stateFlags & TASK_STATE_THREAD_WAS_ABORTED) != 0) ||
                                              (Thread.CurrentThread.ThreadState == ThreadState.AbortRequested) ||
                                              ((m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) != 0));
    
            // Handle the single-Action case
            Action singleAction = continuationObject as Action;
            if (singleAction != null)
            {
                AwaitTaskContinuation.RunOrScheduleAction(singleAction, bCanInlineContinuations, ref t_currentTask);
                LogFinishCompletionNotification();
                return;
            }
    
            // Handle the single-ITaskCompletionAction case
            ITaskCompletionAction singleTaskCompletionAction = continuationObject as ITaskCompletionAction;
            if (singleTaskCompletionAction != null)
            {
                if (bCanInlineContinuations)
                {
                    singleTaskCompletionAction.Invoke(this);
                }
                else
                {
                    ThreadPool.UnsafeQueueCustomWorkItem(new CompletionActionInvoker(singleTaskCompletionAction, this), forceGlobal: false);
                }
                LogFinishCompletionNotification();
                return;
            }
    
            // Handle the single-TaskContinuation case
            TaskContinuation singleTaskContinuation = continuationObject as TaskContinuation;
            if (singleTaskContinuation != null)
            {
                singleTaskContinuation.Run(this, bCanInlineContinuations);
                LogFinishCompletionNotification();
                return;
            }
    
            // Not a single; attempt to cast as list
            List<object> continuations = continuationObject as List<object>;
    
            if (continuations == null)
            {
                LogFinishCompletionNotification();
                return;  // Not a single or a list; just return
            }
    
            //
            // Begin processing of continuation list
            //
    
            // Wait for any concurrent adds or removes to be retired
            lock (continuations) { }
            int continuationCount = continuations.Count;
    
            // Fire the asynchronous continuations first ...
            for (int i = 0; i < continuationCount; i++)
            {
                // Synchronous continuation tasks will have the ExecuteSynchronously option,
                // and we're looking for asynchronous tasks...
                var tc = continuations[i] as StandardTaskContinuation;
                if (tc != null && (tc.m_options & TaskContinuationOptions.ExecuteSynchronously) == 0)
                {
                    TplEtwProvider.Log.RunningContinuationList(Id, i, tc);
                    continuations[i] = null; // so that we can skip this later
                    tc.Run(this, bCanInlineContinuations);
                }
            }
    
            // ... and then fire the synchronous continuations (if there are any).
            // This includes ITaskCompletionAction, AwaitTaskContinuations, and
            // Action delegates, which are all by default implicitly synchronous.
            for (int i = 0; i < continuationCount; i++)
            {
                object currentContinuation = continuations[i];
                if (currentContinuation == null) continue;
                continuations[i] = null; // to enable free'ing up memory earlier
                TplEtwProvider.Log.RunningContinuationList(Id, i, currentContinuation);
    
                // If the continuation is an Action delegate, it came from an await continuation,
                // and we should use AwaitTaskContinuation to run it.
                Action ad = currentContinuation as Action;
                if (ad != null)
                {
                    AwaitTaskContinuation.RunOrScheduleAction(ad, bCanInlineContinuations, ref t_currentTask);
                }
                else
                {
                    // If it's a TaskContinuation object of some kind, invoke it.
                    TaskContinuation tc = currentContinuation as TaskContinuation;
                    if (tc != null)
                    {
                        // We know that this is a synchronous continuation because the
                        // asynchronous ones have been weeded out
                        tc.Run(this, bCanInlineContinuations);
                    }
                    // Otherwise, it must be an ITaskCompletionAction, so invoke it.
                    else
                    {
                        Contract.Assert(currentContinuation is ITaskCompletionAction, "Expected continuation element to be Action, TaskContinuation, or ITaskContinuationAction");
                        var action = (ITaskCompletionAction)currentContinuation;
    
                        if (bCanInlineContinuations)
                        {
                            action.Invoke(this);
                        }
                        else
                        {
                            ThreadPool.UnsafeQueueCustomWorkItem(new CompletionActionInvoker(action, this), forceGlobal: false);
                        }
                    }
                }
            }
    
            LogFinishCompletionNotification();
        }
    }

    当我们访问TTask<TResult> 的Result的时候,如果Task还没有执行完毕,那么我们必须等待TASK,调用Task<TResult>的GetResultCore(waitCompletionNotification: true)方法,该方法最终调用Task的InternalWait方法。

    /// The core wait function, which is only accesible internally. It's meant to be used in places in TPL code where 
        /// the current context is known or cached.
        internal bool InternalWait(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            // ETW event for Task Wait Begin
            var etwLog = TplEtwProvider.Log;
            bool etwIsEnabled = etwLog.IsEnabled();
            if (etwIsEnabled)
            {
                Task currentTask = Task.InternalCurrent;
                etwLog.TaskWaitBegin(
                    (currentTask != null ? currentTask.m_taskScheduler.Id : TaskScheduler.Default.Id), (currentTask != null ? currentTask.Id : 0),
                    this.Id, TplEtwProvider.TaskWaitBehavior.Synchronous, 0, System.Threading.Thread.GetDomainID());
            }
    
            bool returnValue = IsCompleted;
    
            // If the event hasn't already been set, we will wait.
            if (!returnValue)
            {
                // Alert a listening debugger that we can't make forward progress unless it slips threads.
                // We call NOCTD for two reasons:
                //    1. If the task runs on another thread, then we'll be blocked here indefinitely.
                //    2. If the task runs inline but takes some time to complete, it will suffer ThreadAbort with possible state corruption,
                //       and it is best to prevent this unless the user explicitly asks to view the value with thread-slipping enabled.
                Debugger.NotifyOfCrossThreadDependency();
    
                // We will attempt inline execution only if an infinite wait was requested
                // Inline execution doesn't make sense for finite timeouts and if a cancellation token was specified
                // because we don't know how long the task delegate will take.
                if (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled &&
                    WrappedTryRunInline() && IsCompleted) // TryRunInline doesn't guarantee completion, as there may be unfinished children.
                {
                    returnValue = true;
                }
                else
                {
                    returnValue = SpinThenBlockingWait(millisecondsTimeout, cancellationToken);
                }
            }
    
            Contract.Assert(IsCompleted || millisecondsTimeout != Timeout.Infinite);
    
            // ETW event for Task Wait End
            if (etwIsEnabled)
            {
                Task currentTask = Task.InternalCurrent;
                if (currentTask != null)
                {
                    etwLog.TaskWaitEnd(currentTask.m_taskScheduler.Id, currentTask.Id, this.Id);
                }
                else
                {
                    etwLog.TaskWaitEnd(TaskScheduler.Default.Id, 0, this.Id);
                }
                // logically the continuation is empty so we immediately fire
                etwLog.TaskWaitContinuationComplete(this.Id);
            }
    
            return returnValue;
        }
      /// Waits for the task to complete, for a timeout to occur, or for cancellation to be requested.
      /// The method first spins and then falls back to blocking on a new event.
       private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            bool infiniteWait = millisecondsTimeout == Timeout.Infinite;
            uint startTimeTicks = infiniteWait ? 0 : (uint)Environment.TickCount;
            bool returnValue = SpinWait(millisecondsTimeout);
            if (!returnValue)
            {
                var mres = new SetOnInvokeMres();
                try
                {
                    AddCompletionAction(mres, addBeforeOthers: true);
                    if (infiniteWait)
                    {
                        returnValue = mres.Wait(Timeout.Infinite, cancellationToken);
                    }
                    else
                    {
                        uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks;
                        if (elapsedTimeTicks < millisecondsTimeout)
                        {
                            returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);
                        }
                    }
                }
                finally
                {
                    if (!IsCompleted) RemoveContinuation(mres);
                    // Don't Dispose of the MRES, because the continuation off of this task may
                    // still be running.  This is ok, however, as we never access the MRES' WaitHandle,
                    // and thus no finalizable resources are actually allocated.
                }
            }
            return returnValue;
        }
        private void AddCompletionAction(ITaskCompletionAction action, bool addBeforeOthers)
        {
            if (!AddTaskContinuation(action, addBeforeOthers))
                action.Invoke(this); // run the action directly if we failed to queue the continuation (i.e., the task completed)
        }
        
       // Record a continuation task or action.
       // Return true if and only if we successfully queued a continuation.
       private bool AddTaskContinuation(object tc, bool addBeforeOthers)
        {
            Contract.Requires(tc != null);
    
            // Make sure that, if someone calls ContinueWith() right after waiting for the predecessor to complete,
            // we don't queue up a continuation.
            if (IsCompleted) return false;
    
            // Try to just jam tc into m_continuationObject
            if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))
            {
                // If we get here, it means that we failed to CAS tc into m_continuationObject.
                // Therefore, we must go the more complicated route.
                return AddTaskContinuationComplex(tc, addBeforeOthers);
            }
            else return true;
        }
        private sealed class SetOnInvokeMres : ManualResetEventSlim, ITaskCompletionAction
        {
            internal SetOnInvokeMres() : base(false, 0) { }
            public void Invoke(Task completingTask) { Set(); }
        }

    SpinThenBlockingWait方法先自旋检查是否完成,如果没有调用SetOnInvokeMres实例来完成Wait【SetOnInvokeMres是ManualResetEventSlim的子类,这里调用 mres.Wait(Timeout.Infinite, cancellationToken) 或者 mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken)来实现阻塞】,但是这里有一个AddCompletionAction方法,然后调用AddTaskContinuation方法,在AddTaskContinuation方法中有一句很重要【if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))】说白了就是把SetOnInvokeMres实例赋给m_continuationObject对象。所以AddCompletionAction方法里面的action.Invoke(this);放法在这里没有调用。而是等待task执行完毕后FinishContinuations方法中的singleTaskCompletionAction.Invoke(this);设置信号量,如果m_continuationObject是真正的TaskContinuation实例,我们就调用【    singleTaskContinuation.Run(this, bCanInlineContinuations)】

    我们简单总结一下一下调用次序:

    TaskFactory.StartNew
    ->Task<TResult>.StartNew【该方法首先调用构造函数,构造函数里面调用PossiblyCaptureContext来捕获线程上下文,然后调用自己的ScheduleAndStart方法】
    ->TaskScheduler.InternalQueueTask
    ->ThreadPoolTaskScheduler.QueueTask【到这里Task就转换为线程了】
    ->Task.ExecuteEntry【如果是线程池是通过调用IThreadPoolWorkItem的ExecuteWorkItem方法进入的】
    ->Task.ExecuteWithThreadLocal【该方法首先拷贝捕获到的线程上下文,再调用ExecutionContext.Run,传入上传下文和回调方法】
    ->Task.ExecutionContextCallback
    ->Task.Execute
    ->Task<TResult>.InnerInvoke【该方法会真正调用我们自己的方法,并且给返回值赋值】
    ->Task.Finish 【标记该Task已经结束】
    ->Task.FinishStageTwo
    ->Task.FinishStageThree
    ->Task.FinishContinuations【会触发SET】
    -------------------------
    Task<TResult>的Result属性,就是Task的返回值,如果Task执行完毕直接返回,否者调用Task<TResult>.GetResultCore
    ->Task.InternalWait
    ->Task.SpinThenBlockingWait【调用SetOnInvokeMres的Wait方法】

  • 相关阅读:
    什么是工厂模式
    冒泡算法
    CSS中的绝对定位与相对定位
    JS function立即调用的几种写法
    paip.java 线程无限wait的解决
    paip.java 多线程参数以及返回值Future FutureTask 的使用.
    PAIP.并发编程 多核编程 线程池 ExecutorService的判断线程结束
    paip.slap工具与于64位win7与JDBC的性能对比
    JProfiler8 注册码序列号
    paip.提升性能---mysql 优化cpu多核以及lan性能的关系.
  • 原文地址:https://www.cnblogs.com/majiang/p/7899202.html
Copyright © 2011-2022 走看看