zoukankan      html  css  js  c++  java
  • 自己动手写个异步IO函数 --(基于 c# Task)

    前言    对于服务端,达到高性能、高扩展离不开异步。对于客户端,函数执行时间是1毫秒还是100毫秒差别不大,没必要为这一点点时间煞费苦心。对于异步,好多人还有误解,如: 异步就是多线程;异步就是如何利用好线程池。异步不是这么简单,否则微软没必要在异步上花费这么多心思。本文就介绍异步最新的实现方式:Task,并自己动手写一个异步IO函数。只有了解了异步函数内部实现方式,才能更好的利用它。

      对于c#,异步处理经过了多个阶段,但是对于现阶段异步就是Task,微软用Task来抽象异步操作。以后的异步函数,处理的都是Task。你会看到处处都是task的身影。为了处理Task,c#引入了两个关键词async,await。这两个关键词也可以说是一个关键词,因为async的存在是为了表明await是关键词。总而言之:两个关键词干了一件事,async关键词并不改变函数的声明。

      有人说await就是语法糖,不值得大书特书,我只能说你错了。软件开发坚持的原则为:代码要省,代码要清晰易懂!如果没有语法糖,代码的维护性大大降低。await这个语法糖做的事很多;如果不用await,处理同样的逻辑,需要多写很多代码,并导致逻辑不清晰。

    Task的分类

       异步分为两类 compute-base 和 IO-base。compute-base就是计算密集型,函数所有的操作都是在内存中,不涉及IO;如果运行这个函数,则单个线程利用率达100%;IO-base就是涉及到IO,IO包括文件读写,socket读写;这类异步操作底层涉及到IOCP(完成端口)。相应的,Task也分为两类。

      对于这两个区别可以举个例子来区分:一台电脑为4个线程。如果同时有4个compute-base线程运行,cpu的利用率为100%。如果同时有4个 IO-base的异步操作,cpu利用率可能远远低于100%。

      对于.net 库,有些函数会有两个版本:一个是同步操作,一个是异步操作(函数名以Async结尾,返回值为Task)。举个例子:

         

        这是WebClient类获取网址内容函数。你会问DownloadStringTaskAsync是compute-base  Task,还是 IO-base Task?我可以肯定的告诉你:只要是.net基本类库提供的异步函数基本都是IO-base Task(微软官方文档是这样要求)。其实这样要求是有道理的:对于compute-base异步,比较容易封装;再者,这样的异步是不能大规模的并发的。如果16个线程cpu,同时并发16个这样的异步操作就是上限了;如果再多,反而会有害!

      有人说,如果基本类库不提供 IO-base Task函数,我也可以封装一下,这个也不难啊!代码如下:

    //把一个同步操作,改造成异步
    public static async Task<byte[]> DownloadDataAsync(string url)
    {
                WebRequest request = WebRequest.Create(url);
    
                return await Task.Run(() =>
                {
                    using (var response = request.GetResponse())
                    using (var responseStream = response.GetResponseStream())
                    using (var result = new MemoryStream())
                    {
                        responseStream.CopyTo(result);
                        return result.ToArray();
                    }
                });
     }

      上面函数如果说是异步操作,也不错。但是,这不是“好”的异步操作!这是异步操作中夹杂着同步IO。会导致线程等待。如果有100个这样的异步操作,就需要100个线程,这些线程大部分并没在干活,而是在等待! 对于“好”的异步IO,如果同时有100个操作,甚至几万个操作,使用的线程都是有限的,一般不超过cpu线程数。这是怎么实现的?这涉及到IOCP,说起来有些复杂,可以参考IOCP相关资料。类库提供异步IO操作,都是涉及到IOCP的。所以得到如下结论: 如果类库不提供IO异步函数,无论怎么改造,不可能改造成“好”的异步函数!

    Task实现的基本原理

      Task变量状态如下

      状态简要分为生成、执行、执行完毕这三个阶段。如果执行完毕前获取执行后的值Task.Result,函数就会阻塞。那我怎么知道什么时候完成,而又不阻塞?有两种办法,轮询和回调通知。Task.IsCompleted属性会指示函数是否执行完毕。轮询不是一个好的办法,采用回调通知是上策!

      回调通知有个缺点:处理逻辑不直观,回调函数与异步调用函数不在一块,还有可能隔着很多行代码或不在同一个文件。如果这样的回调函数太多,对理解代码逻辑造成困难,代码不易维护。微软也考虑到了这个问题,那就用await关键词来解决。await帮你处理了回调函数的弊端,其实await后面的代码与await前面的代码不属于同一个函数!await后面的代码就是回调函数!微软确实给我们解决了这个问题,但是又带来另一个问题。好多人不明白,明明是同一个函数,怎么实现了等待而又不阻塞当前线程!归根到底,还是要理解await背后帮你干了啥,否则就会一直困惑。

      要生成Task变量,只要理解几个关键的处理步骤就行了。TaskCompletionSource类会帮助我们生成Task。如果IO完成,设置Task的状态为完成就行了。后面,就会执行回调函数(await关键词帮我干了,你看不到回调)!

    如何写一个IO-base Task函数?

      大部分情况下不需要自己写这样的函数。但是,人是有好奇心的,如果不明白函数实现的原理,总是感觉不能释怀!再者,明白函数实现原理,就能更好的利用这类函数。下面讲解一下如何利用IOCP来实现异步函数。我没有参考.net的源码,只是根据逻辑推理应该这实现。肯定和.net源码实现有出入,我写这些代码主要为了阐明Task实现原理。

    IOCP处理逻辑

      对于IOCP,这里不展开来讲了,否则就跑题了。以socket读取为例子,简单总结一下:如果你要接收100个字节的数据,你告诉IOCP你要接收100个字节数据,并提供100个字节的buffer,函数立即返回;数据到达后,IOCP通知你,数据到了,数据就存在你提供的buffer里。

       实现异步IO伪代码如下:

     class AyncInside
        {
            //完成端口句柄
            IntPtr iocpHandle = IntPtr.Zero;
    
            Task<byte[]> ReadFromSocket(int count)
            {
                //生成此次操作需要相关数据 
                TaskCompletionSourceRead readInfo = new TaskCompletionSourceRead();
                readInfo.Buffer = new byte[count];
    
                //如果没生成iocp则生成。
                if (iocpHandle == IntPtr.Zero)
                {
                    iocpHandle = CreateIocp();
                }
    
                // 告诉iocp,要读取count字节数据。函数不会阻塞,会立即返回
                //从完成端口收到数据后,会调用ReadScoketCallback
                //我们把readInfo也传给函数。当回调时,该变量会传给回调函数。
                ReadFromIocp(iocpHandle, readInfo.Buffer, readInfo, ReadScoketCallback);
                
                return readInfo.Tcs.Task;
            }
    
    
            void ReadScoketCallback(byte[] buffer, int readCount,object tag)
            {
                //tag就是调用ReadFromIocp时,传的readInfo
                //便于我们知道异步调用时的上下文数据。
                TaskCompletionSourceRead readInfo = tag as TaskCompletionSourceRead;
               
                if(buffer.Length == readCount )
                {
                    //调用完SetResult后,await后面的代码就会被执行!
                    readInfo.Tcs.SetResult(buffer);
                }
                else if (buffer.Length > 0)
                {
                    Array.Resize(ref buffer, readCount);
                    readInfo.Tcs.SetResult(buffer);
                }
                else
                {
                    readInfo.Tcs.TrySetException(new Exception("读取数据异常!socket可能已断开!"));
                }
            }
    
            private void ReadFromIocp(IntPtr iocpHandle, byte[] buffer, object tag,
                Action<byte[] , int,object> readScoketCallback)
            {
                throw new NotImplementedException();
            }
    
            private IntPtr CreateIocp()
            {
                throw new NotImplementedException();
            }
    
        }
    
        //封装异步读取需要的数据
        class TaskCompletionSourceRead
        {
            public TaskCompletionSource<byte[]> Tcs { get; set; }
            public byte[] Buffer { get; set; }
        }

      上述代码与实际可使用代码差距还很大,我在这里主要为了阐明原理。通过上面的代码,我们可以看到,这个异步函数并没生成新的线程;网卡驱动和IOCP配合,帮我们接收了数据。所以这种方式才是真正可扩展的异步IO。

    后记 异步IO和可扩展服务紧密关联。对于.net core平台,你会看到很多函数都是异步的。理解和用好异步IO函数非常重要。本文通过自己对异步IO的理解,试图通过代码阐明异步IO实现原理。希望你看过此文后,能对此有更深的理解!如果此文对你有所裨益,希望您给点个赞!

  • 相关阅读:
    积水路面Wet Road Materials 2.3
    门控时钟问题
    饮料机问题
    Codeforces Round #340 (Div. 2) E. XOR and Favorite Number (莫队)
    Educational Codeforces Round 82 (Rated for Div. 2)部分题解
    Educational Codeforces Round 86 (Rated for Div. 2)部分题解
    Grakn Forces 2020部分题解
    2020 年百度之星·程序设计大赛
    POJ Nearest Common Ancestors (RMQ+树上dfs序求LCA)
    算法竞赛进阶指南 聚会 (LCA)
  • 原文地址:https://www.cnblogs.com/yuanchenhui/p/async-io-example.html
Copyright © 2011-2022 走看看