zoukankan      html  css  js  c++  java
  • .NET 异步详解

    前言#

    博客园中有很多关于 .NET async/await 的介绍,但是很遗憾,很少有正确的,甚至说大多都是“从现象编原理”都不过分。

    最典型的比如通过前后线程 ID 来推断其工作方式、在 async 方法中用 Thread.Sleep 来解释 Task 机制而导出多线程模型的结论、在 Task.Run 中包含 IO bound 任务来推出这是开了一个多线程在执行任务的结论等等。

    看上去似乎可以解释的通,可是很遗憾,无论是从原理还是结论上看都是错误的。

    要了解 .NET 中的 async/await 机制,首先需要有操作系统原理的基础,否则的话是很难理解清楚的,如果没有这些基础而试图向他人解释,大多也只是基于现象得到的错误猜想。

    初看异步#

    说到异步大家应该都很熟悉了,2012 年 C# 5 引入了新的异步机制:Task,并且还有两个新的关键字 await 和 async,这已经不是什么新鲜事了,而且如今这个异步机制已经被各大语言借鉴,如 JavaScript、TypeScript、Rust、C++ 等等。

    下面给出一个简单的对照:

    语言调度单位关键字/方法
    C# Task<>ValueTask<> asyncawait
    C++ std::future<> co_await
    Rust std::future::Future<> .await
    JavaScript、TypeScript Promise<> asyncawait

    当然,这里这并不是本文的重点,只是提一下,方便大家在有其他语言经验的情况下(如果有),可以认识到 C# 中 Task 和 async/await 究竟是一个和什么可以相提并论的东西。

    多线程编程#

    在该异步编程模型诞生之前,多线程编程模型是很多人所熟知的。一般来说,开发者会使用 Threadstd::thread 之类的东西作为线程的调度单位来进行多线程开发,每一个这样的结构表示一个对等线程,线程之间采用互斥或者信号量等方式进行同步。

    多线程对于科学计算速度提升等方面效果显著,但是对于 IO 负荷的任务,例如从读取文件或者 TCP 流,大多数方案只是分配一个线程进行读取,读取过程中阻塞该线程:

    Copy
    void Main()
    {
        while (true)
        {
            var client = socket.Accept();
            new Thread(() => ClientThread(client)).Start();
        }
    }
    
    void ClientThread(Socket client)
    {
        var buffer = new byte[1024];
        while (...)
        {
            // read and block
            client.Read(buffer, 0, 1024); 
        }
    }
    

    上述代码中,Main 函数在接收客户端之后即分配了一个新的用户线程用于处理该客户端,从客户端接收数据。client.Read() 执行后,该线程即被阻塞,即使阻塞期间该线程没有任何的操作,该用户线程也不会被释放,并被操作系统不断轮转调度,这显然浪费了资源。

    另外,如果线程数量多起来,频繁在不同线程之间轮转切换上下文,线程的上下文也不小,会浪费掉大量的性能。

    异步编程#

    因此对于此工作内容(IO),我们在 Linux 上有了 epoll/io_uring 技术,在 Windows 上有了 IOCP 技术用以实现异步 IO 操作。

    (这里插句题外话,吐槽一句,Linux 终于知道从 Windows 抄作业了。先前的 epoll 对比 IOCP 简直不能打,被 IOCP 全面打压,io_uring 出来了才好不容易能追上 IOCP,不过 IOCP 从 Windows Vista 时代开始每一代都有很大的优化,io_uring 能不能追得上还有待商榷)

    这类 API 有一个共同的特性就是,在操作 IO 的时候,调用方控制权被让出,等待 IO 操作完成之后恢复先前的上下文,重新被调度继续运行。

    所以表现就是这样的:

    假设我现在需要从某设备中读取 1024 个字节长度的数据,于是我们将缓冲区的地址和内容长度等信息封装好传递给操作系统之后我们就不管了,读取什么的让操作系统去做就好了。

    操作系统在内核态下利用 DMA 等方式将数据读取了 1024 个字节并写入到我们先前的 buffer 地址下,然后切换到用户态将从我们先前让出控制权的位置,对其进行调度使其继续执行。

    你可以发现这么一来,在读取数据期间就没有任何的线程被阻塞,也不存在被频繁调度和切换上下文的情况,只有当 IO 操作完成之后才会被重新调度并恢复先前让出控制权时的上下文,使得后面的代码继续执行。

    当然,这里说的是操作系统的异步 IO 实现方式,以便于读者对异步这个行为本身进行理解,和 .NET 中的异步还是有区别,Task 本身和操作系统也没什么关系。

    Task (ValueTask)#

    说了这么久还是没有解释 Task 到底是个什么东西,从上面的分析就可以得出,Task 其实就是一个所谓的调度单位,每个异步任务被封装为一个 Task 在 CLR 中被调度,而 Task 本身会运行在 CLR 中的预先分配好的线程池中。

    总有很多人因为 Task 借助线程池执行而把 Task 归结为多线程模型,这是完全错误的。

    这个时候有人跳出来了,说:你看下面这个代码

    Copy
    static async Task Main()
    {
        while (true)
        {
            Console.WriteLine(Environment.CurrentManagedThreadId);
            await Task.Delay(1000);
        }
    }
    

    输出的线程 ID 不一样欸,你骗人,这明明就是多线程!对于这种言论,我也只能说这些人从原理上理解的就是错误的。

    当代码执行到 await 的时候,此时当前的控制权就已经被让出了,当前线程并没有在阻塞地等待延时结束;待 Task.Delay() 完毕后,CLR 从线程池当中挑起了一个先前分配好的已有的但是空闲的线程,将让出控制权前的上下文信息恢复,使得该线程恰好可以从先前让出的位置继续执行下去。这个时候,可能挑到了先前让出前所在的那个线程,导致前后线程 ID 一致;也有可能挑到了另外一个和之前不一样的线程执行下面的代码,使得前后的线程 ID 不一致。在此过程中并没有任何的新线程被分配了出去。

    当然,在 WPF 等地方,因为利用了 SynchronizationContext 对调度行为进行了控制,所以可以得到和上述不同的结论,和这个相关的还有 .ConfigureAwait() 的用法,但是这里不是本文重点,因此就不做展开。

    在 .NET 中由于采用 stackless 的做法,这里需要用到 CPS 变换,大概是这么个流程:

    Copy
    using System;
    using System.Threading.Tasks;
    
    public class C 
    {
        public async Task M()
        {
            var a = 1;
            await Task.Delay(1000);
            Console.WriteLine(a);
        }
    }
    

    编译后:

    Copy
    public class C
    {
        [StructLayout(LayoutKind.Auto)]
        [CompilerGenerated]
        private struct <M>d__0 : IAsyncStateMachine
        {
            public int <>1__state;
    
            public AsyncTaskMethodBuilder <>t__builder;
    
            private int <a>5__2;
    
            private TaskAwaiter <>u__1;
    
            private void MoveNext()
            {
                int num = <>1__state;
                try
                {
                    TaskAwaiter awaiter;
                    if (num != 0)
                    {
                        <a>5__2 = 1;
                        awaiter = Task.Delay(1000).GetAwaiter();
                        if (!awaiter.IsCompleted)
                        {
                            num = (<>1__state = 0);
                            <>u__1 = awaiter;
                            <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
                            return;
                        }
                    }
                    else
                    {
                        awaiter = <>u__1;
                        <>u__1 = default(TaskAwaiter);
                        num = (<>1__state = -1);
                    }
                    awaiter.GetResult();
                    Console.WriteLine(<a>5__2);
                }
                catch (Exception exception)
                {
                    <>1__state = -2;
                    <>t__builder.SetException(exception);
                    return;
                }
                <>1__state = -2;
                <>t__builder.SetResult();
            }
    
            void IAsyncStateMachine.MoveNext()
            {
                //ILSpy generated this explicit interface implementation from .override directive in MoveNext
                this.MoveNext();
            }
    
            [DebuggerHidden]
            private void SetStateMachine(IAsyncStateMachine stateMachine)
            {
                <>t__builder.SetStateMachine(stateMachine);
            }
    
            void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
            {
                //ILSpy generated this explicit interface implementation from .override directive in SetStateMachine
                this.SetStateMachine(stateMachine);
            }
        }
    
        [AsyncStateMachine(typeof(<M>d__0))]
        public Task M()
        {
            <M>d__0 stateMachine = default(<M>d__0);
            stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
            stateMachine.<>1__state = -1;
            stateMachine.<>t__builder.Start(ref stateMachine);
            return stateMachine.<>t__builder.Task;
        }
    }
    

    可以看到,原来的变量 a 被塞到了 <a>5__2 里面去(相当于备份上下文),Task 状态的转换后也是靠着调用 MoveNext(相当于状态转换后被重新调度)来接着驱动代码执行的,里面的 num 就表示当前的状态,num 如果为 0 表示 Task 完成了,于是接着执行下面的代码 Console.WriteLine(<a>5__2);

    但是上面和经典的多线程编程的那一套一样吗?不一样。

    至于 ValueTask 是个什么玩意,官方发现,Task 由于本身是一个 class,在运行时如果频繁反复的分配和回收会给 GC 造成不小的压力,因此出了一个 ValueTask,这个东西是 struct,分配在栈上,这样的话就不会给 GC 造成压力了,减轻了开销。不过也正因为 ValueTask 是会在栈上分配的值类型结构,因此提供的功能也不如 Task 全面。

    Task.Run#

    由于 .NET 是允许有多个线程的,因此也提供了 Task.Run 这个方法,允许我们将 CPU bound 的任务放在上述的线程池之中的某个线程上执行,并且允许我们将该负载作为一个 Task 进行管理,仅在这一点才和多线程的采用线程池的编程比较像。

    对于浏览器环境(v8),这个时候是完全没有多线程这一说的,因此你开的新的 Promise 其实是后面利用事件循环机制,将该微任务以异步的方式执行。

    想一想在 JavaScript 中,Promise 是怎么用的:

    Copy
    let p = new Promise((resolve, reject) => {
        // do something
        let success = true;
        let result = 123456;
    
        if (success) {
            resolve(result);
        }
        else {
            reject("failed");
        }
    })
    

    然后调用:

    Copy
    let r = await p;
    console.log(r); // 输出 123456
    

    你只需要把这一套背后的驱动器:事件循环队列,替换成 CLR 的线程池,就差不多是 .NET 的 Task 相对 JavaScript 的 Promise 的工作方式了。

    如果你把 CLR 线程池线程数量设置为 1,那就和 JavaScript 这套几乎差不多了(虽然实现上还是有差异)。

    这时有人要问了:“我在 Task.Run 里面套了好几层 Task.Run,可是为什么层数深了之后里面的不执行了呢?” 这是因为上面所说的线程池被耗尽了,后面的 Task 还在排着队等待被调度。

    自己封装异步逻辑#

    了解了上面的东西之后,相信对 .NET 中的异步机制应该理解得差不多了,可以看出来这一套是名副其实的 coroutine,并且在实现上是 stackless 的。至于有的人说的什么状态机什么的,只是实现过程中利用的手段而已,并不是什么重要的东西。

    那我们要怎么样使用 Task 来编写我们自己的异步代码呢?

    事件驱动其实也可以算是一种异步模型,例如以下情景:

    A 函数调用 B 函数,调用发起后就直接返回不管了(BeginInvoke),B 函数执行完成后触发事件执行 C 函数。

    Copy
    private event Action CompletedEvent;
    
    void A()
    {
        CompletedEvent += C;
        Console.WriteLine("begin");
        ((Action)B).BeginInvoke();
    }
    
    void B()
    {
        Console.WriteLine("running");
        CompletedEvent?.Invoke();
    }
    
    void C()
    {
        Console.WriteLine("end");
    }
    

    那么我们现在想要做一件事,就是把上面的事件驱动改造为利用 async/await 的异步编程模型,改造后的代码就是简单的:

    Copy
    async Task A()
    {
        Console.WriteLine("begin");
        await B();
        Console.WriteLine("end");
    }
    
    Task B()
    {
        Console.WriteLine("running");
        return Task.CompletedTask;
    }
    

    你可以看到,原本 C 函数的内容被放到了 A 调用 B 的下面,为什么呢?其实很简单,因为这里 await B(); 这一行以后的内容,本身就可以理解为 B 函数的回调了,只不过在内部实现上,不是直接从 B 进行调用的回调,而是 A 先让出控制权,B 执行完成后,CLR 切换上下文,将 A 调度回来继续执行剩下的代码。

    如果事件相关的代码已经确定不可改动(即不能改动 B 函数),我们想将其封装为异步调用的模式,那只需要利用 TaskCompletionSource 即可:

    Copy
    private event Action CompletedEvent;
    
    async Task A()
    {
        // 因为 TaskCompletionSource 要求必须有一个泛型参数
        // 因此就随便指定了一个 bool
        // 本例中其实是不需要这样的一个结果的
        // 需要注意的是从 .NET 5 开始
        // TaskCompletionSource 不再强制需要泛型参数
        var tsc = new TaskCompletionSource<bool>();
        // 随便写一个结果作为 Task 的结果
        CompletedEvent += () => tsc.SetResult(false);
    
        Console.WriteLine("begin");
        ((Action)B).BeginInvoke();
        await tsc.Task;
        Console.WriteLine("end");
    }
    
    void B()
    {
        Console.WriteLine("running");
        CompletedEvent?.Invoke();
    }
    

    顺便提一句,这个 TaskCompletionSource<T> 其实和 JavaScript 中的 Promise<T> 更像。SetResult() 方法对应 resolve()SetException() 方法对应 reject()。.NET 比 JavaScript 还多了一个取消状态,因此还可以 SetCancelled() 表示任务被取消了。

    同步方式调用异步代码#

    说句真的,一般能有这个需求,都说明你的代码写的有问题,但是如果你无论如何都想以阻塞的方式去等待一个异步任务完成的话:

    Copy
    Task t = ...
    t.GetAwaiter().GetResult();
    

    祝你好运,这相当于,t 中的异步任务开始执行后,你将当前线程阻塞,然后等到 t 完成之后再唤醒,可以说是:毫无意义,而且很有可能因为代码编写不当而导致死锁的发生。

    void async 是什么?#

    最后有人会问了,函数可以写 async Task Foo(),还可以写 async void Bar(),这有什么区别呢?

    对于上述代码,我们一般调用的时候,分别这么写:

    Copy
    await Foo();
    Bar();
    

    可以发现,诶这个 Bar 函数不需要 await 诶。为什么呢?

    其实这和用以下方式调用 Foo 是一样的:

    Copy
    _ = Foo();
    

    换句话说就是调用后瞬间就直接抛掉不管了,不过这样你也就没法知道这个异步任务的状态和结果了。

    await 必须配合 Task/ValueTask 才能用吗?#

    当然不是。

    在 C# 中只要你的类中包含 GetAwaiter() 方法和 bool IsCompleted 属性,并且 GetAwaiter() 返回的东西包含一个 GetResult() 方法、一个 bool IsCompleted 属性和实现了 INotifyCompletion,那么这个类的对象就是可以 await 的。

    Copy
    public class MyTask<T>
    {
        public MyAwaiter<T> GetAwaiter()
        {
            return new MyAwaiter<T>();
        }
    }
    
    public class MyAwaiter<T> : INotifyCompletion
    {
        public bool IsCompleted { get; private set; }
        public T GetResult()
        {
            throw new NotImplementedException();
        }
        public void OnCompleted(Action continuation)
        {
            throw new NotImplementedException();
        }
    }
    
    public class Program
    {
        static async Task Main(string[] args)
        {
            var obj = new MyTask<int>();
            await obj;
        }
    }
    

    结语#

    本文至此就结束了,感兴趣的小伙伴可以多多学习一下操作系统原理,对 CLR 感兴趣也可以去研究其源代码:https://github.com/dotnet/runtime 。

    .NET 的异步和线程密不可分,但是和多线程编程方式和思想是有本质不同的,也希望大家不要将异步和多线程混淆了,而这有联系也有区别。

    从现象猜测本质是大忌,可能解释的通但是终究只是偶然现象,而且从原理上看也是完全错误的,甚至官方的实现代码稍微变一下可能立马就无法解释的通了。

    总之,通过本文希望大家能对异步和 .NET 中的异步有一个更清晰的理解。

    感谢阅读。

    漫思
  • 相关阅读:
    工作中的体悟和经验
    java中List的toArray方法
    Arthas干货总结
    内部类访问外部类的方法
    PriorityBlockingQueue 源码分析
    ArrayBlockingQueue 源码解析
    Kafka 读书笔记--日志索引
    Mybatis源码解析之--谈谈${}
    Mybatis源码分析之--浅析ResultSetHandler
    linux命令--ll
  • 原文地址:https://www.cnblogs.com/sexintercourse/p/13743805.html
Copyright © 2011-2022 走看看