zoukankan      html  css  js  c++  java
  • [翻译]剖析C#中的异步方法

    翻译自一篇博文,原文:Dissecting the async methods in C#

    有些括号里的是译注或我自己的理解。

    异步系列

    • 剖析C#中的异步方法
    • 扩展C#中的异步方法
    • C#中异步方法的性能特点。
    • 用一个用户场景来掌握它们

    C#这门语言对开发者的生产效率有很大帮助,我很高兴最近的推动让它变得对高性能应用更加合适。

    举例来说:C# 5引入了“async”方法(async表示异步,也是关键字)。这个特性从用户的角度看是很实用的,因为它能将几个基于Task的操作合并为一个。但是这种抽象是需要代价的。Task是引用类型,每次实例化的时候都会造成堆上的内存分配,就算是“async”方法同步地执行完毕的情况下也不例外。有了C# 7,在某些场景下,异步方法可以返回类似Task的类型,比如ValueTask,来减少或避免在堆上的内存分配。

    为了理解如何将上述一切变为可能,我们需要看看异步方法在底层是如何实现的。

    但首先,先来回顾一点历史。

    TaskTask<T>都是.Net 4.0时引入的,在我看来,这对.Net的异步和并行编程带来了巨大的观念性的改变。不像早期的异步模式,如.Net 1.0的BeginXXX/EndXXX模式(也叫异步编程模型),或是来自.Net 2.0的基于事件的异步模式,如BackgroundWorker,任务(即Task实例)是可以组合的。

    一个任务代表一个单位的工作(或者说一件事,可能完成了,也可能还没完成),它承诺会在将来把这个工作的结果给你。这个承诺可以是基于IO操作,或计算密集型(computation-intensive)操作,但这不重要,重要的是这个操作的结果是“自给自足”的(早期的异步模型做不到这点),是一等公民。你可以传递一个“未来”:你可以将它存储在一个变量中,从一个方法返回它,或者将它传递给另一个方法。你可以把两个“未来”合并,形成另一个新的,你可以给这个“未来”添加continuation(就是这个任务完成之后的回调,或者说“任务完成后的延续”),然后同步地等待(即await,也是关键字)结果。仅仅依靠一个任务实例,你就可以根据操作是成功了还是失败了,或是被取消了,来决定下一步执行什么。

    任务并行库(Task Parallel Library)(TPL)改变了我们对并行的思考方式,C# 5通过引入async/await而向前迈进了一步。Async/await能帮我们将任务组合起来,让我们能使用像try/catchusing等著名的结构。但正如其他任何抽象,async/await这个特性是有代价的。要了解这个代价是什么,我们必须去底层看看。

    异步方法的本质

    通常来说一个方法只有一个进入点,一个出口点(它确实可以有多个return语句,但是在运行时,一次调用只有一个出口点)。但是异步方法和迭代器(有yield return的方法)却不同。就异步方法来说,调用方几乎能立即得到结果(也就是TaskTask<T>),然后通过这个得到的任务,等待(await)实际的结果。

    让我们将“异步方法”定义为一个被上下文(contextual)关键字async所标记的方法。这并不意味着这个方法异步地执行。甚至这并不意味着这个方法是异步的。这个关键字的意思只是:编译器会对这个方法进行一些特殊的转换处理。

    让我们考虑下面这个异步方法:

    class StockPrices
    {
        private Dictionary<string, decimal> _stockPrices;
        public async Task<decimal> GetStockPriceForAsync(string companyId)
        {
            await InitializeMapIfNeededAsync();
            _stockPrices.TryGetValue(companyId, out var result);
            return result;
        }
     
        private async Task InitializeMapIfNeededAsync()
        {
            if (_stockPrices != null)
                return;
     
            await Task.Delay(42);
            // 从外部数据源或内存中的缓存得到股票价格
            _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
        }
    }
    
    

    GetStockPriceForAsync方法保证了_stockPrices这个map被初始化,然后从缓存(即_stockPrices)中获得结果。

    为了更好地理解编译器做了或能做什么,让我们试着手写一个转换。

    手动转换一个异步方法

    TPL提供了两个主要的构建快,帮助我们构建和连接任务:Task.ContinueWith用于任务继续,TaskCompletionSource<T>用户手动构建任务。

    class GetStockPriceForAsync_StateMachine
    {
        enum State { Start, Step1, }
        private readonly StockPrices @this;
        private readonly string _companyId;
        private readonly TaskCompletionSource<decimal> _tcs;
        private Task _initializeMapIfNeededTask;
        private State _state = State.Start;
     
        public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
        {
            this.@this = @this;
            _companyId = companyId;
        }
     
        public void Start()
        {
            try
            {
                if (_state == State.Start)
                {
                    // 从方法的开始到第一个“await”的代码
     
                    if (string.IsNullOrEmpty(_companyId))
                        throw new ArgumentNullException();
     
                    _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
     
                    // 更新状态并注册回调函数
                    _state = State.Step1;
                    _initializeMapIfNeededTask.ContinueWith(_ => Start());
                }
                else if (_state == State.Step1)
                {
                    // 需要先检查错误和是否被取消
                    if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
                        _tcs.SetCanceled();
                    else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
                        _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
                    else
                    {
                        // 从第一个await到方法结束的代码
     
                        @this._store.TryGetValue(_companyId, out var result);
                        _tcs.SetResult(result);
                    }
                }
            }
            catch (Exception e)
            {
                _tcs.SetException(e);
            }
        }
     
        public Task<decimal> Task => _tcs.Task;
    }
     
    public Task<decimal> GetStockPriceForAsync(string companyId)
    {
        var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
        stateMachine.Start();
        return stateMachine.Task;
    }
    

    这段代码有些冗长但相对好懂。GetStockPriceForAsync中的所有逻辑都被移到了使用了 "continuation passing style"GetStockPriceForAsync_StateMachine.Start方法。我们的异步转换的主要思想就是按“await边界”来划分原来的方法。划分的第一块代码段就是方法的开始到第一个await。第二个代码段——从第一个await到第二个await。第三个代码段——从第二个await到第三个await或是方法的结尾,以此类推:

    // 生成的状态机的第一步:
     
    if (string.IsNullOrEmpty(_companyId)) throw new ArgumentNullException();
    _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
    

    每一个被等待的任务现在都变成了状态机的一个字段,Start方法将自己注册为这些任务的continuation:

    _state = State.Step1;
    _initializeMapIfNeededTask.ContinueWith(_ => Start());
    

    然后,当任务完成时,Start方法被回调,_state字段被检查从而知道我们进行到哪一步了。然后的逻辑就是检查任务是否成功,或被取消。如果成功,状态机就继续执行下一段代码段。当一切都完成后,状态机设置TaskCompletionSource<T>实例的结果,让GetStockPricesForAsync返回的任务变成“已完成”的状态。

    // 从第一个await到方法结束的代码
     
    @this._stockPrices.TryGetValue(_companyId, out var result);
    _tcs.SetResult(result); // 让调用者得到结果
    

    这个“实现”有一些缺陷:

    • 有很多堆分配:一次对状态机的分配,一次对TaskCompletionSource<T>的分配,一次对TaskCompletionSource<T>内部的任务实例的分配,一次对continuation委托的分配。
    • 缺少“热路径优化”("hot path optimizations"):如果被等待的任务已经完成了,那么就没有理由再创建一个continuation。
    • 缺少可扩展性:这个实现与基于任务的类紧密耦合,所以不可能用于其他场合,比如等待其他非TaskTask<T>的类型或返回类型。

    现在让我们看一下实际的异步状态机是如何解决上述问题的。

    异步状态机

    编译器对异步方法的转换总得来说和上面我们的手动转换很相似。为了得到正确的行为,编译器依赖于以下类型:

    1. 生成的状态机,包含了所有原始的异步方法的逻辑,就像是一个异步方法的堆栈帧(stack frame)。
    2. 包含着完成的任务的AsyncTaskMethodBuilder(十分类似于 TaskCompletionSource<T>),它管理状态机的状态转换。
    3. 装饰(wrap)着一个任务的TaskAwaiter,它在必要时会给任务添加continuation。
    4. MoveNextRunner,它会在正确的执行上下文(execution context)中调用IAsyncStateMachine.MoveNext

    生成的状态机在debug模式下是class,在release模式下是struct。所有其他的类型(除了MoveNextRunner)都在BCL中被定义为struct。

    编译器为状态机生成一个类似<YourMethodNameAsync>d__1的类型名称,其中包含了用户无法定义或引用的非法标示符,从而避免命名冲突。但是为了简洁,在接下来的例子中我会用合法的标示符(用_代替<>)和稍微容易理解一点的名字。

    原始的方法

    原始的“异步”方法创建状态机实例,用捕获到的状态(包括this指针,如果方法不是静态的话)来初始化它,然后通过调用AsyncTaskMethodBuilder.Start方法(注意状态机实例是以ref关键字被传递的),来启动执行。

    [AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
    public Task<decimal> GetStockPriceFor(string companyId)
    {
        _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
        _GetStockPriceFor_d__.__this = this;
        _GetStockPriceFor_d__.companyId = companyId;
        _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
        _GetStockPriceFor_d__.__state = -1;
        var __t__builder = _GetStockPriceFor_d__.__builder;
        __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
        return _GetStockPriceFor_d__.__builder.Task;
    }
    

    按引用传递是一个重要的优化,因为状态机往往是相当大的struct(>100字节),按引用传递避免了不必要的拷贝。

    状态机
    struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
    {
        public StockPrices __this;
        public string companyId;
        public AsyncTaskMethodBuilder<decimal> __builder;
        public int __state;
        private TaskAwaiter __task1Awaiter;
     
        public void MoveNext()
        {
            decimal result;
            try
            {
                TaskAwaiter awaiter;
                if (__state != 0)
                {
                    // 生成的状态机的状态1:
                    if (string.IsNullOrEmpty(companyId))
                        throw new ArgumentNullException();
     
                    awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
     
                    // 热路径优化:如果任务已经完成,那么状态机自动跳到下一步
                    if (!awaiter.IsCompleted)
                    {
                        __state = 0;
                        __task1Awaiter = awaiter;
     
                        // 下面的调用终究会导致状态机的装箱(boxing)
                        __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
                        return;
                    }
                }
                else
                {
                    awaiter = __task1Awaiter;
                    __task1Awaiter = default(TaskAwaiter);
                    __state = -1;
                }
     
                // GetResult返回void,但是如果被等待的任务失败了,它就会抛出异常
                // 这个异常之后会被捕捉并改变“结果任务”。
                awaiter.GetResult();
                __this._stocks.TryGetValue(companyId, out result);
            }
            catch (Exception exception)
            {
                // 最终状态:失败
                __state = -2;
                __builder.SetException(exception);
                return;
            }
     
            // 最终状态:成功
            __state = -2;
            __builder.SetResult(result);
        }
     
        void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
        {
            __builder.SetStateMachine(stateMachine);
        }
    }
    

    生成的状态机看起来很复杂,但是本质上它和我们手动创建的状态机是很类似的。

    尽管生成的状态机与我们手动创建的类似,但它有一些非常重要的区别:

    1. “热路径”("Hot path")优化

    与我们的天真方法不同,生成的状态机知道:一个等待的任务可能已经完成了。

    awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
     
    // 热路径优化:如果任务已经完成,那么状态机自动跳到下一步
    if (!awaiter.IsCompleted)
    {
        // 不相关的代码
     
        // 下面的调用终究会导致状态机的装箱(boxing)
        __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
        return;
    }
    

    如果被等待的任务已经完成(无论是否成功),状态机进入下一步:

    // GetResult返回void,但是如果被等待的任务失败了,它就会抛出异常
    // 这个异常之后会被捕捉并改变“结果任务”。
    awaiter.GetResult();
    __this._stocks.TryGetValue(companyId, out result);
    

    这意味着如果所有被等待的任务都已事先是完成的状态,那么整个状态机都会保持在堆栈上。即使在今天,如果所有被等待的任务已经完成,或者会同步地执行完毕,异步方法也会有一个极其小的内存开销。唯一剩下的内存分配就是任务实例本身。

    2. 错误处理

    生成的状态机并没有对错误或被取消状态的“被等待任务”进行特殊的逻辑上的处理。状态机调用awaiter.GetResult(),如果任务是被取消的状态那么这个方法会抛出TaskCancelledException,如果任务错误那么就是另一个异常类型。这是个优雅的解决办法,在这里可以正常地运作,因为GetResult()相较于task.Wait()task.Result在错误处理上有一些不同。

    即使只有唯一一个导致任务失败的异常,task.Wait()task.Result都会抛出一个AggregateException异常。理由很简单:一个任务不仅可以代表通常只有一个错误的IO密集型(IO-bound)操作,还可以代表并行计算的结果。在后者的情况下,操作可能会有一个以上的错误,而AggregateException就是设计为把所有错误集中在一个地方。

    但是async/await是专门为通常最多只有一个错误的异步操作设计的。所以语言设计者们觉得:让awaiter.GetResult()AggregateException中包含的第一个错误抛出,是更合理的。这个设计决策并不是完美的,在接下来的文章中,我们将看到这种抽象何时会有缺陷。

    异步状态机仅仅是整个迷宫中的一小部分。要想看清整个迷宫,我们需要知道状态机实例如何与 TaskAwaiter<T>和 AsyncTaskMethodBuilder<T>进行交互。

    不同的部分是如何被粘合在一起的?

    这个图表看起来十分复杂,但每一部分都是精心设计的,都扮演泽重要的角色。其中最有趣的协作发生在当一个被等待的任务尚未完成时(在图中以棕色矩形标记):

    执行上下文(Execution Context)

    你可能会问:执行上下文是什么?为什么我们需要搞得这么复杂?

    在同步的世界里,每个线程都将上下文信息保存在线程本地(thread-local)的存储中。可以是安全相关的信息,特定文化的数据,或其他东西。当在一个线程中按顺序调用三个方法时,这些信息会自然地在这些方法中传递。但对于异步方法来说,这已经不再适用了。异步方法的每个“部分”都可以在不同的线程中执行,这使得线程本地的信息无法使用。

    执行上下文保存了逻辑上的控制流的信息,即使它跨越多个线程。

    Task.RunThreadPool.QueueUserWorkItem这样的方法会自动捕获上下文。Task.Run方法从调用线程中捕获ExecutionContext,并将其存储在Task实例中。当与此Task实例相关联的TaskScheduler执行一个给定的委托时,它会在存储的上下文中执行ExecutionContext.Run

    我们可以用AsyncLocal来实际演示一下这个概念:

    static Task ExecutionContextInAction()
    {
        var li = new AsyncLocal<int>();
        li.Value = 42;
     
        return Task.Run(() =>
        {
            // Task.Run会恢复执行上下文
            Console.WriteLine("In Task.Run: " + li.Value);
        }).ContinueWith(_ =>
        {
            // 任务的continuation也会恢复执行上下文
            Console.WriteLine("In Task.ContinueWith: " + li.Value);
        });
    }
    

    在这些情况下,执行上下文被传递到Task.Run,然后又被传递到Task.ContinueWith. 所以如果你运行此方法你会看到:

    In Task.Run: 42
    In Task.ContinueWith: 42
    

    但并不是所有BCL中的方法都会自动捕获和恢复执行上下文。有两个例外分别是TaskAwaiter<T>.UnsafeOnCompleteAsyncMethodBuilder<T>.AwaitUnsafeOnComplete。语言的设计者们决定添加一些“不安全的”方法,使用AsyncMethodBuilder<T>MoveNextRunner而不是依靠如AwaitTaskContinuation的内置设施,来手动地传递执行上下文。我怀疑在现有的实现中有一些性能上的原因或是其他限制。

    这里有一个例子说明了区别:

    static async Task ExecutionContextInAsyncMethod()
    {
        var li = new AsyncLocal<int>();
        li.Value = 42;
        await Task.Delay(42);
     
        // 上下文被隐式地捕获。li.Value为42
        Console.WriteLine("After first await: " + li.Value);
     
        var tsk2 = Task.Yield();
        tsk2.GetAwaiter().UnsafeOnCompleted(() =>
        {
            // 上下文没有被捕获:li.Value为0
            Console.WriteLine("Inside UnsafeOnCompleted: " + li.Value);
        });
     
        await tsk2;
     
        // 上下文被捕获。li.Value为42
        Console.WriteLine("After second await: " + li.Value);
    }
    

    输出为:

    After first await: 42
    Inside UnsafeOnCompleted: 0
    After second await: 42
    

    结论

    • 异步方法与同步方法有很大的不同。
    • 编译器为每个异步方法都生成一个状态机,并将原来方法中所有的逻辑移到状态机中。
    • 生成的代码对同步场景进行了高度优化:如果所有被等待的任务都完成了,那么异步方法的额外开销是很小的。
    • 如果被等待的任务还没有完成,则依赖于许多帮助类来完成工作,以保持原方法的逻辑不变。

    参考文献

    如果你想学习更多与执行上下文相关的内容,我强烈推荐以下两篇博文:

    接下来:我们将探索一个C#异步方法的可扩展模型。

  • 相关阅读:
    移动函数的封装示例
    如何从不均衡类中进行机器学习
    DPM(Deformable Parts Model)--原理(一)
    K-means聚类算法
    机器学习中对核函数的理解
    总结:Bias(偏差),Error(误差),Variance(方差)及CV(交叉验证)
    技术干货
    神经网络入门
    目标函数、损失函数、代价函数
    地铁客流检测训练问题记录
  • 原文地址:https://www.cnblogs.com/raytheweak/p/8735141.html
Copyright © 2011-2022 走看看