zoukankan      html  css  js  c++  java
  • C# Task ContinueWith的实现

    看了上一篇C# Task 是什么?返回值如何实现? Wait如何实现 我们提到FinishContinuations方法中会调用TaskContinuation实例,那么我们的ContinueWith就应该非常简单,只需要把TASK放到TaskContinuation结合中就可以了,ContinueWith可以是 Action<Task<TResult>>也可以是 Func<Task<TResult>,TNewResult> ,其中Task<TResult>的实现如下:

     public class Task<TResult> : Task{
        //Creates a continuation that executes when the target Task{TResult}" completes
        public Task ContinueWith(Action<Task<TResult>> continuationAction)
        {
            StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
            return ContinueWith(continuationAction, TaskScheduler.Current, default(CancellationToken), TaskContinuationOptions.None, ref stackMark);
        }
        internal Task ContinueWith(Action<Task<TResult>> continuationAction, TaskScheduler scheduler, CancellationToken cancellationToken,TaskContinuationOptions continuationOptions, ref StackCrawlMark stackMark)
        {
            if (continuationAction == null)
            {
                throw new ArgumentNullException("continuationAction");
            }
            if (scheduler == null)
            {
                throw new ArgumentNullException("scheduler");
            }
            TaskCreationOptions creationOptions;
            InternalTaskOptions internalOptions;
            CreationOptionsFromContinuationOptions(continuationOptions,out creationOptions,out internalOptions);
            
            Task continuationTask = new ContinuationTaskFromResultTask<TResult>(this, continuationAction, null,    creationOptions, internalOptions,ref stackMark);
            ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions);
            return continuationTask;
        }
        
        public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, Object, TNewResult> continuationFunction, Object state)
        {
            StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
            return ContinueWith<TNewResult>(continuationFunction, state, TaskScheduler.Current, default(CancellationToken), TaskContinuationOptions.None, ref stackMark);
        }
    
        // Same as the above overload, just with a stack mark.
        internal Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, Object, TNewResult> continuationFunction, Object state,TaskScheduler scheduler, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, ref StackCrawlMark stackMark)
        {
            if (continuationFunction == null)
            {
                throw new ArgumentNullException("continuationFunction");
            }
            if (scheduler == null)
            {
                throw new ArgumentNullException("scheduler");
            }
    
            TaskCreationOptions creationOptions;
            InternalTaskOptions internalOptions;
            CreationOptionsFromContinuationOptions(continuationOptions,out creationOptions,out internalOptions);
    
            Task<TNewResult> continuationFuture = new ContinuationResultTaskFromResultTask<TResult,TNewResult>(this, continuationFunction, state,creationOptions, internalOptions,ref stackMark);
            ContinueWithCore(continuationFuture, scheduler, cancellationToken, continuationOptions);
            return continuationFuture;
        }
     }

    ContinueWith的核心是调用Task的ContinueWithCore方法,这里把我们的Action或Fun包装成子的Task,比如这里的ContinuationResultTaskFromResultTask实现【很是标准】如下:

     internal sealed class ContinuationResultTaskFromResultTask<TAntecedentResult, TResult> : Task<TResult>
        {
            private Task<TAntecedentResult> m_antecedent;
            public ContinuationResultTaskFromResultTask(
                Task<TAntecedentResult> antecedent, Delegate function, object state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) :
                base(function, state, Task.InternalCurrentIfAttached(creationOptions), default(CancellationToken), creationOptions, internalOptions, null)
            {
                Contract.Requires(function is Func<Task<TAntecedentResult>, TResult> || function is Func<Task<TAntecedentResult>, object, TResult>, "Invalid delegate type in ContinuationResultTaskFromResultTask");
                m_antecedent = antecedent;
                PossiblyCaptureContext(ref stackMark);
            }
    
            internal override void InnerInvoke()
            {
                var antecedent = m_antecedent;
                Contract.Assert(antecedent != null,  "No antecedent was set for the ContinuationResultTaskFromResultTask.");
                m_antecedent = null;
                antecedent.NotifyDebuggerOfWaitCompletionIfNecessary();
    
                // Invoke the delegate
                Contract.Assert(m_action != null);
                var func = m_action as Func<Task<TAntecedentResult>, TResult>;
                if (func != null)
                {
                    m_result = func(antecedent);
                    return;
                }
                var funcWithState = m_action as Func<Task<TAntecedentResult>, object, TResult>;
                if (funcWithState != null)
                {
                    m_result = funcWithState(antecedent, m_stateObject);
                    return;
                }
                Contract.Assert(false, "Invalid m_action in ContinuationResultTaskFromResultTask");
            }
        }

    ContinuationResultTaskFromResultTask<TAntecedentResult, TResult> 就重写基类Task的InnerInvoke方法,现在回到Task的ContinueWithCore方法:

    public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
    {
        /// Registers the continuation and possibly runs it (if the task is already finished).
        internal void ContinueWithCore(Task continuationTask, TaskScheduler scheduler,CancellationToken cancellationToken, TaskContinuationOptions options)
        {
            Contract.Requires(continuationTask != null, "Task.ContinueWithCore(): null continuationTask");
            Contract.Requires(scheduler != null, "Task.ContinueWithCore(): null scheduler");
            Contract.Requires(!continuationTask.IsCompleted, "Did not expect continuationTask to be completed");
    
            // Create a TaskContinuation
            TaskContinuation continuation = new StandardTaskContinuation(continuationTask, options, scheduler);
    
            // If cancellationToken is cancellable, then assign it.  
            if (cancellationToken.CanBeCanceled)
            {
                if (IsCompleted || cancellationToken.IsCancellationRequested)
                {
                    continuationTask.AssignCancellationToken(cancellationToken, null, null);
                }
                else
                {
                    continuationTask.AssignCancellationToken(cancellationToken, this, continuation);
                }
            }
    
            // In the case of a pre-canceled token, continuationTask will have been completed
            // in a Canceled state by now.  If such is the case, there is no need to go through
            // the motions of queuing up the continuation for eventual execution.
            if (!continuationTask.IsCompleted)
            {
                if ((this.Options & (TaskCreationOptions)InternalTaskOptions.PromiseTask) != 0 && !(this is ITaskCompletionAction))
                {
                    var etwLog = TplEtwProvider.Log;
                    if (etwLog.IsEnabled())
                    {
                        etwLog.AwaitTaskContinuationScheduled(TaskScheduler.Current.Id, Task.CurrentId ?? 0, continuationTask.Id);
                    }
                }
    
                // Attempt to enqueue the continuation
                bool continuationQueued = AddTaskContinuation(continuation, addBeforeOthers: false);
    
                // If the continuation was not queued (because the task completed), then run it now.
                if (!continuationQueued) continuation.Run(this, bCanInlineContinuationTask: true);
            }
        }
        
        private bool AddTaskContinuation(object tc, bool addBeforeOthers)
        {
            Contract.Requires(tc != null);
            if (IsCompleted) return false;
    
            // Try to just jam tc into m_continuationObject
            if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))
            {            
                return AddTaskContinuationComplex(tc, addBeforeOthers);
            }
            else return true;
        }
        
         private bool AddTaskContinuationComplex(object tc, bool addBeforeOthers)
        {
            Contract.Requires(tc != null, "Expected non-null tc object in AddTaskContinuationComplex");
            object oldValue = m_continuationObject;
            // Logic for the case where we were previously storing a single continuation
            if ((oldValue != s_taskCompletionSentinel) && (!(oldValue is List<object>)))
            {
                List<object> newList = new List<object>();
                newList.Add(oldValue);
                Interlocked.CompareExchange(ref m_continuationObject, newList, oldValue);
            }
    
            // m_continuationObject is guaranteed at this point to be either a List or
            // s_taskCompletionSentinel.
            List<object> list = m_continuationObject as List<object>;
            Contract.Assert((list != null) || (m_continuationObject == s_taskCompletionSentinel),"Expected m_continuationObject to be list or sentinel");
            if (list != null)
            {
                lock (list)
                {
                    if (m_continuationObject != s_taskCompletionSentinel)
                    {
                        // Before growing the list we remove possible null entries that are the
                        // result from RemoveContinuations()
                        if (list.Count == list.Capacity)
                        {
                            list.RemoveAll(s_IsTaskContinuationNullPredicate);
                        }
    
                        if (addBeforeOthers)
                            list.Insert(0, tc);
                        else
                            list.Add(tc);
    
                        return true; // continuation successfully queued, so return true.
                    }
                }
            }
            // We didn't succeed in queuing the continuation, so return false.
            return false;
        }
            
        /// Handles everything needed for associating a CancellationToken with a task which is being constructed.
        /// This method is meant to be be called either from the TaskConstructorCore or from ContinueWithCore
        private void AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
        {
            ContingentProperties props = EnsureContingentPropertiesInitialized(needsProtection: false);
            props.m_cancellationToken = cancellationToken;
    
            try
            {
                if (AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource)
                {
                    cancellationToken.ThrowIfSourceDisposed();
                }                    
                if ((((InternalTaskOptions)Options & (InternalTaskOptions.QueuedByRuntime | InternalTaskOptions.PromiseTask | InternalTaskOptions.LazyCancellation)) == 0))
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        // Fast path for an already-canceled cancellationToken
                        this.InternalCancel(false);
                    }
                    else
                    {
                        // Regular path for an uncanceled cancellationToken
                        CancellationTokenRegistration ctr;
                        if (antecedent == null)
                        {
                            ctr = cancellationToken.InternalRegisterWithoutEC(s_taskCancelCallback, this);
                        }
                        else
                        {
                            ctr = cancellationToken.InternalRegisterWithoutEC(s_taskCancelCallback,new Tuple<Task, Task, TaskContinuation>(this, antecedent, continuation));
                        }
                        props.m_cancellationRegistration = new Shared<CancellationTokenRegistration>(ctr);
                    }
                }
            }
            catch
            {
                if ((m_parent != null) &&((Options & TaskCreationOptions.AttachedToParent) != 0)&& ((m_parent.Options & TaskCreationOptions.DenyChildAttach) == 0))
                {
                    m_parent.DisregardChild();
                }
                throw;
            }
        }    
        
        private readonly static Action<Object> s_taskCancelCallback = new Action<Object>(TaskCancelCallback);
        private static void TaskCancelCallback(Object o)
        {
            var targetTask = o as Task;
            if (targetTask == null)
            {
                var tuple = o as Tuple<Task, Task, TaskContinuation>;
                if (tuple != null)
                {
                    targetTask = tuple.Item1;
    
                    Task antecedentTask = tuple.Item2;
                    TaskContinuation continuation = tuple.Item3;
                    antecedentTask.RemoveContinuation(continuation);
                }
            }
            Contract.Assert(targetTask != null,"targetTask should have been non-null, with the supplied argument being a task or a tuple containing one");
            targetTask.InternalCancel(false);
        }
    }

    ContinueWithCore实现也比较简单,首先把当前的continuationTask转换为StandardTaskContinuation,然后把CancellationToken赋给continuationTask,如果continuationTask没有完成, 那么调用AddTaskContinuation把continuationTask加到等待对象中,如果AddTaskContinuation添加失败,就直接调用continuationTask。 让我妈来看看StandardTaskContinuation的实现:

     internal abstract class TaskContinuation
        {
            internal abstract void Run(Task completedTask, bool bCanInlineContinuationTask);
    
            /// <summary>Tries to run the task on the current thread, if possible; otherwise, schedules it.</summary>
            protected static void InlineIfPossibleOrElseQueue(Task task, bool needsProtection)
            {
                Contract.Requires(task != null);
                Contract.Assert(task.m_taskScheduler != null);
                if (needsProtection)
                {
                    if (!task.MarkStarted())
                        return; // task has been previously started or canceled.  Stop processing.
                }
                else
                {
                    task.m_stateFlags |= Task.TASK_STATE_STARTED;
                }
    
                // Try to inline it but queue if we can't
                try
                {
                    if (!task.m_taskScheduler.TryRunInline(task, taskWasPreviouslyQueued: false))
                    {
                        task.m_taskScheduler.InternalQueueTask(task);
                    }
                }
                catch (Exception e)
                {
                   
                    if (!(e is ThreadAbortException && (task.m_stateFlags & Task.TASK_STATE_THREAD_WAS_ABORTED) != 0))    // this ensures TAEs from QueueTask will be wrapped in TSE
                    {
                        TaskSchedulerException tse = new TaskSchedulerException(e);
                        task.AddException(tse);
                        task.Finish(false);
                    }
                }
            }
            internal abstract Delegate[] GetDelegateContinuationsForDebugger();
        }
        
        /// <summary>Provides the standard implementation of a task continuation.</summary>
        internal class StandardTaskContinuation : TaskContinuation
        {
            internal readonly Task m_task;       
            internal readonly TaskContinuationOptions m_options;
            private readonly TaskScheduler m_taskScheduler;
            internal StandardTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler)
            {
                Contract.Requires(task != null, "TaskContinuation ctor: task is null");
                Contract.Requires(scheduler != null, "TaskContinuation ctor: scheduler is null");
                m_task = task;
                m_options = options;
                m_taskScheduler = scheduler;
                if (AsyncCausalityTracer.LoggingOn)
                    AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, m_task.Id, "Task.ContinueWith: " + ((Delegate)task.m_action).Method.Name, 0);
    
                if (Task.s_asyncDebuggingEnabled)
                {
                    Task.AddToActiveTasks(m_task);
                }
            }
    
            /// <summary>Invokes the continuation for the target completion task.</summary>
            /// <param name="completedTask">The completed task.</param>
            /// <param name="bCanInlineContinuationTask">Whether the continuation can be inlined.</param>
            internal override void Run(Task completedTask, bool bCanInlineContinuationTask)
            {
                Contract.Assert(completedTask != null);
                Contract.Assert(completedTask.IsCompleted, "ContinuationTask.Run(): completedTask not completed");
    
                // Check if the completion status of the task works with the desired 
                // activation criteria of the TaskContinuationOptions.
                TaskContinuationOptions options = m_options;
                bool isRightKind =
                    completedTask.IsRanToCompletion ?
                        (options & TaskContinuationOptions.NotOnRanToCompletion) == 0 :
                        (completedTask.IsCanceled ?
                            (options & TaskContinuationOptions.NotOnCanceled) == 0 :
                            (options & TaskContinuationOptions.NotOnFaulted) == 0);
    
                // If the completion status is allowed, run the continuation.
                Task continuationTask = m_task;
                if (isRightKind)
                {
                    if (!continuationTask.IsCanceled && AsyncCausalityTracer.LoggingOn)
                    {
                        // Log now that we are sure that this continuation is being ran
                        AsyncCausalityTracer.TraceOperationRelation(CausalityTraceLevel.Important, continuationTask.Id, CausalityRelation.AssignDelegate);
                    }
                    continuationTask.m_taskScheduler = m_taskScheduler;
                  
                    if (bCanInlineContinuationTask && // inlining is allowed by the caller
                        (options & TaskContinuationOptions.ExecuteSynchronously) != 0) // synchronous execution was requested by the continuation's creator
                    {
                        InlineIfPossibleOrElseQueue(continuationTask, needsProtection: true);
                    }
                    else
                    {
                        try { continuationTask.ScheduleAndStart(needsProtection: true); }
                        catch (TaskSchedulerException)
                        {
                            // No further action is necessary -- ScheduleAndStart() already transitioned the 
                            // task to faulted.  But we want to make sure that no exception is thrown from here.
                        }
                    }
                }
                // Otherwise, the final state of this task does not match the desired
                // continuation activation criteria; cancel it to denote this.
                else continuationTask.InternalCancel(false);
            }
            internal override Delegate[] GetDelegateContinuationsForDebugger()
            {
                if (m_task.m_action == null)
                {
                    return m_task.GetDelegateContinuationsForDebugger();
                }
                return new Delegate[] { m_task.m_action as Delegate };
            }
        }

    StandardTaskContinuation的实现非常简单,而Task的AssignCancellationToken方法也没什么可以说的,只是需要注意下一下回调s_taskCancelCallback。Task的AddTaskContinuation方法首先检查当前Task是否结束,结束了就不用再调用AddTaskContinuationComplex方法了,直接调用continuation.Run方法,AddTaskContinuationComplex方法会把task添加到m_continuationObject中,最后FinishContinuations在调用m_continuationObject中的TaskContinuation.Run方法。

    总结一下:ContinueWith方法主要调用ContinueWithCore方法,ContinueWithCore方法主要是调用AddTaskContinuation,AddTaskContinuation方法把Task加到m_continuationObject,【如果主的Task已经完成,那么这里AddTaskContinuation返回false,则直接调用TaskContinuation.Run】,当主的Task完成时会调用FinishContinuations方法,FinishContinuations方法会检测m_continuationObject中TaskContinuation对象,一次调用它们的Run方法

  • 相关阅读:
    shell 遍历当前目录以及所有子目录下文件
    linux shell 字符串操作(长度,查找,替换)详解
    将自定义结构存入std::set 或者 std::map
    CentOS下搭建SVN服务器
    基于Debian的linux系统软件安装命令
    rm搭配grep删除符合条件的文件
    Centos系统环境
    怎么恢复用mysqldump备份数据和恢复数据
    mac 安装 node.js 的 canvas
    centos 6.5 安装 node.js 的 canvas模块
  • 原文地址:https://www.cnblogs.com/majiang/p/7903556.html
Copyright © 2011-2022 走看看