zoukankan      html  css  js  c++  java
  • .NET异步编程:IO完成端口与BeginRead【转载】

    原文地址:http://tech.it168.com/a2011/0316/1166/000001166827_all.shtml

    【IT168 专稿】写这个系列原本的想法是讨论一下.NET中异步编程风格的变化,特别是F#中的异步工作流以及未来的.NET 5.0中的基于任务的异步编程模型。但经过前几篇文章(为什么需要异步传统的异步编程使用CPS及yield实现异步)的发表后,很多人对IO异步背后实现的原理以及为什么这样能提高性能很感兴趣。其实我本不想花更多的文字在这些底层实现的细节上,一来我并不擅长这些方面,二来我们使用.NET的异步IO就不需要关心这些底层东西,因为已经为你封装完备了。不过为了避免大家一再在这上面商讨,我还是在这个系列中间插入了一篇来解释一下。

      本文我将从内核对象IO完成端口开始介绍,然后来瞧瞧.NET BCL中的FileStream.BeginRead是如何利用IO完成端口来实现的。

      IO完成端口(IO Completion Port)

      大多数人应该或多或少地听说过IO完成端口这么个东西,而且也知道它是实现高性能IO,高伸缩性应用的尚方宝剑。IO完成端口是一个非常复杂的内核对象,其实现的也非常巧妙,细细琢磨还是非常有意思的。

      创建高伸缩性的应用的一个基本原则就是:创建更少的线程。线程数更少首先消耗的资源就少,每个线程的创建除了要浪费CPU时间外,还要创建一系列的数据结构用来保存线程相关的一些信息:用户栈,线程上下文,内核栈等。这个总共加起来大概1.5M左右,那么你算算你的32位机器总共能使用多少内存?那么对应地能创建多少线程?可能有人讲那对于64位的就无所谓了。嗯,在资源占用这方面64位确实不用担心。但是系统中可运行的线程数越多,你的CPU数又是有限的(8个?80个?)。Windows的任务调度机制是每个线程会运行一个时间片,然后Windows抢占式的调度另一个线程运行。那么线程数越多,Windows势必要进行更频繁的线程上下文切换。线程上下文切换对系统性能的影响在这里我就不多说了,你可以搜搜资料。

      那么如何做到创建更少的线程,而又干更多的事儿呢?答案就是“不等待”。相对CPU来说,IO设备的速度简直低的要命。就好像飞机和拖拉机的差别一样,我们可不能让拖拉机拖了飞机的后退儿。而IO完成端口就是为了这个而生的:创建更少的线程,干更多的事儿。

      IO完成端口首先不是一个我们看得见摸得着的什么插口,也和我们常说的80这样的端口不同。你可以将其理解为一个数据结构或一个对象(下面我会用C#的代码来辅助讲解IO完成端口,仅仅是讲解,这些代码并不是真实的实现):

      Windows提供了一个CreateIoCompletionPort API来创建IO完成端口,实际上这个API有两个作用:创建IO完成端口和将一个IO设备与该端口绑定。创建IO完成端口时有一个很重要的参数:指定同时最多能有多少个线程并行运行,这就是为了保证更少的线程,如果你将这个数值指定为0,那么默认值就会是你机器的CPU数。IO端口里还有一个IO设备句柄列表,你可以将很多设备句柄与这个端口绑定(文件、Socket等):

    //函数原型
    HANDLE CreateIoCompletionPort(
        //设备句柄
        HANDLE     hFile, 
        //已有的IO完成端口句柄,如果这里已经指定,则是将前面指定的设备与该端口绑定
        HANDLE     hExistingCompletionPort,
        //因为一个IO完成端口可以绑定很多设备,可以用这个来区分
        ULONG_PTR  CompletionKey, 
        //允许同时运行的线程数
        DWORD      dwNumberOfConcurrentThreads 
    );
    //创建一个IO完成端口
    HANDLE hIoPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,2);
    //创建文件,如果要异步访问文件则需要指定FILE_FLAG_OVERLAPPED
    HANDLE hFile = CreateFile(..);
    //将上面创建的文件句柄与刚才创建的IO完成端口绑定,不仅仅是文件可以
    CreateIoCompletionPort(hFile,hIoPort,1,2);

      除此之外,我们还要为该端口创建一些供使用的线程。然后让这些线程调用Windows提供的GetQueuedCompletionStatus方法。这些线程调用了该方法后会被放到IO完成端口另外一个数据结构中:一个后进先出的队列(我们将其称为等待队列吧)。然后该线程会休眠起来,不占用CPU。然后我们可以调用像ReadFile这样的方法发起一个IO请求:

    BOOL ReadFile(
        HANDLE hFile,
        PVOID  pvBuffer,
        DWORD  nNumBytesToRead,
        PDWORD pdwNumBytes,
        OVERLAPPED* pOverlapped);

    ReadFile(..&overlapped);

      上面代码中的OVERLAPPED是一个非常重要的数据结构,后面会提到。

      现在假设你的某个IO设备收到了一个数据包,Windows就会检查这个IO设备是否跟一个IO完成端口关联了,如果关联了Windows就会把这个数据包投递到这个IO完成端口。IO完成端口里还有另外一个先进先出的队列,用来保存这些IO完成的数据。IO完成端口一看,唔,有个IO完成包投递到我这儿来了,那我看看我的那个等待队列里有没有线程还在休息,如果有就叫它起来干活儿。嘿,还真有一个家伙还在睡觉,如是IO完成端口就唤醒该线程,实际上就是上面的那个GetQueuedCompletionStatus方法返回了。该方法返回时还会得到一些别的信息:接收了多少个字节啊,是哪个设备啊,最重要的是上面提到的OVERLAPPED这个结构等等。起来后的线程就会拿着这些信息干一些后续的事儿:

    BOOL GetQueuedCompletionStatus(
        HANDLE     hCompletionPort,
        PDWORD     pdwNumberOfBytesTransferred,
        PULONG_PTR pCompletionKey,
        OVERLAPPED **ppOverlapped,
        DWORD      dwMilliseconds); 
    //类似于下面的过程
    //创建一个线程
    Thread thread = new Thread(()=>{
        while(true){
            //如果没有IO完成通知到达,该线程就在这里休眠了
            if(GetQueuedCompletionStatus(hIoPort,..ppOverlapped..)){
                //从ppOverlapped里取出所需的信息,比如可能设置了一个回调函数的指针等
            }else{
                //
            }
        }
    });
    thread.Start();

      干完这个事儿后,这个线程又会回到刚才那个队列继续躺起来(其实是再次调用一下那个方法)。我们要注意的是,这个等待队列是后进先出的,也就是说如果下次有消息来了很有可能还是上会那个线程来处理。这样做的目的还是为了提高性能:不需要进行线程上下文切换。因为CPU的速度比IO设备的高出很多,大部分时候我们只需要一两个线程就可以处理很多IO请求。

      现在假设我们的机器有2个CPU,创建IO完成端口时我们指定了同时可以有2个线程运行。我们创建了4个线程放到等待队列里。现在有4个IO完成包投递过来了,放在那个队列里。实际上IO完成端口只会唤醒两个线程去执行,因为你指定了同时只能有两个线程运行,那两个线程运行完就会立马回来继续运行别的。但是现在出了一个状况,其中有一个线程执行过程中因为等待某个资源被阻塞了。那现在只有一个线程执行了,那这个线程就有点吃力了。其实IO完成端口非常聪明,它内部还有一个暂停运行的线程列表和一个正在运行的线程列表。如果某个线程正在运行,它就把这个线程ID放到这个队列里,当这个线程因为某个事儿暂停运行了它就会将其移动到另外一个列表中。IO完成端口会保证正在运行的线程列表里的数目不会超过你指定的最大并发数。一旦这个列表里的数目少于这个数,而IO完成包队列里又有未处理的包,IO完成端口就会看看还有没有在睡觉的线程,如果有就将其唤醒干活儿。

      IO完成端口尽量的控制同时运行的线程数,减少上下文切换浪费的时间和资源,并且让线程尽量的忙起来。

      这里还有一个有意思的地方,假设现在正在运行的两个线程其中一个调用Thread.Sleep休眠了,然后IO完成端口唤醒另外一个线程,让同时运行的线程数保持为2个,不过过了一会儿刚才调用Sleep休眠的线程醒过来了,有意思的事情发生了:现在有三个线程同时运行,超过了我们设置的最大并行数。这个时候IO完成端口是不会杀掉一个线程的,它会让它们继续执行,然后等到执行完了再让这个并行数降下去。

      实际上,IO完成端口不仅仅可以用来处理这种异步的IO,它完全可以作为一种线程间的通讯机制来使用(与IO一点关系都没有),我们可以调用Win API PostQueuedCompletionStatus来模拟一次IO完成,这样我们的IO完成端口就会接到通知,然后调用线程执行。熟悉并发里的Actor模型的同学可能觉得这有点Actor的影子了。

      BeginRead&EndRead

      那么,既然有IO完成端口这么个好东西,如是有很多人想在.NET里也利用利用。其实大可不必,在.NET里异步的IO内部就是使用了IO完成端口。每个CLR初始化后都会创建一个IO完成端口,用来处理IO请求。很多人应该知道ThreadPool里的线程分为两类:worker thread和io completion thread,这里的io completion thread就是上一节说的跟IO完成端口相关联的那些thread。要说它跟其他的thread有什么不同?没什么不同,只是受IO完成端口控制而已。

      为了看看在.NET中是如何利用IO完成端口的,我们将FileStream.BeginRead作为我们的入口点。在FileStream的Init方法里我们会看到这么一段代码:

    if (this._isAsync)
    {
        //...
        try
        {
            flag4 = ThreadPool.BindHandle(this._handle);
        }
        finally
        {
            CodeAccessPermission.RevertAssert();
        }
        //...
    }

      我们感兴趣的就是ThreadPool.BindHandle。还记得上面对IO完成端口的描述么?其实这里做的事儿就是将该文件句柄与每个CLR都初始化了的那个IO完成端口绑定。也就是说如果我们创建一个FileStream时指定了异步,那么IO完成端口就会“监视”这个文件。

      我们再来看看BeginRead这个方法。该方法是用来发起异步IO请求的方法,该方法执行后会立即返回,不阻塞线程。

      首先,看这么段代码:

    if (!this._isAsync)
    {
        return base.BeginRead(array, offset, numBytes, userCallback, stateObject);
    }

      也就是说如果我们创建FileStream时,没有指定为异步,就会调用基类的BeginRead方法,那基类的这个方法又是如何实现的呢?

    [HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)]
    public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
    {
        //...
        ReadDelegate delegate2 = new ReadDelegate(this.Read);
        //...
        return delegate2.BeginInvoke(buffer, offset, count, callback, state);
    }

      其实是创建一个调用同步的Read方法的委托,然后调用一下BeginInvoke方法(在第二篇文章已经说过,这样的调用实际上还是让线程池里的一个线程来调用,我们可以称之为一种伪异步IO)。这里可以得出一个结论:如果你想用BeginRead,那么初始化FileStream的时候就指定异步,否则就不用直接用Read。

      那么如果创建FileStream的时候指定了异步会是什么结果呢(这里的实现在BeginReadCore方法里)?

    [SecuritySafeCritical]
    private unsafe FileStreamAsyncResult BeginReadCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, object stateObject, int numBufferedBytesRead)
    {
        NativeOverlapped* overlappedPtr;
        FileStreamAsyncResult ar = new FileStreamAsyncResult {
            _handle = this._handle,
            _userCallback = userCallback,
            _userStateObject = stateObject,
            _isWrite = false,
            _numBufferedBytes = numBufferedBytesRead
        };
        ManualResetEvent event2 = new ManualResetEvent(false);
        ar._waitHandle = event2;
        Overlapped overlapped = new Overlapped(0, 0, IntPtr.Zero, ar);
        //...
        overlappedPtr = overlapped.Pack(IOCallback, bytes);
        //...
        ar._overlapped = overlappedPtr;

        //...
        ReadFileNative(this._handle, bytes, offset, numBytes, overlappedPtr, out hr)

      上面代码中的NativeOverlapped就是在上一节我们提到的保存有回调等信息的OVERLAPPED结构,在这里也是一样,它保存有我们的userCallback回调。然后通过调用ReadNative发起IO请求,并将这个数据结构传递进去,这里的ReadNative就是对Win32 的ReadFile的封装。发起异步IO请求完毕,BeginRead返回,过了一会儿磁盘驱动程序将数据读回来了,对应的IO完成端口收到通知,IO完成端口把刚才传递进去的NativeOverlapped结构传递给IO线程,IO线程从中取出IOCallback回调,IOCallback回调里有对我们的userCallback回调的调用:

    IOCallback = new IOCompletionCallback(FileStream.AsyncFSCallback);
    private static unsafe void AsyncFSCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
    {
        FileStreamAsyncResult asyncResult = (FileStreamAsyncResult) Overlapped.Unpack(pOverlapped).AsyncResult;
        //...
        AsyncCallback callback = asyncResult._userCallback;
        if (callback != null)
        {
            callback(asyncResult);
        }
    }

      在这个回调里我们会对EndRead进行调用,我们看看EndRead的代码会发现其他一些东西:

    public override unsafe int EndRead(IAsyncResult asyncResult)
    {
        //...
        WaitHandle handle = result._waitHandle;
        if (handle != null)
        {
            try
            {
                handle.WaitOne();
            }
            finally
            {
                handle.Close();
            }
        }
        NativeOverlapped* nativeOverlappedPtr = result._overlapped;
        if (nativeOverlappedPtr != null)
        {
            Overlapped.Free(nativeOverlappedPtr);
        }
        //...
        return (result._numBytes + result._numBufferedBytes);
    }

      首先是销毁我们在BeginRead里初始化的WaitHandle内核对象,然后将NativeOverlapped结构也销毁。所以EndRead除了取回读了多少个字节的作用外,还起了销毁资源的作用。所以有的时候我们想进行这么一个操作:异步的发起请求,但是我们并不关心该请求是否成功。如是我们就假想能不能只调用BeginXXX方法就可以了?从这里看我们不能简单的调用一下BeginXXX就了事了,因为在BeginXXX里分配的一些句柄和内核资源需要在EndXXX里销毁,不然会造成资源泄露。

      总结

      本文先介绍了一下IO完成端口的原理,然后打开FileStream的源代码,看看.NET是如何利用IO完成端口进行异步IO请求的。IO完成端口是一种非常高效的编程方式,所以如果我们想构建界面响应灵敏或高可伸缩性的服务应用,如果你的应用又是IO密集型的,那么你应该仔细的设计你的应用,利用异步IO的优势。

      但是我们切忌拿着锤子就是钉子,发现IO完成端口这把利器就到处使用,甚至都不去思考是否值得。从前面几篇文章以及本文的分析来看,如果我们使用同步的方式那么一切都是在同一个方法内部完成,分配的一些对象的引用也都是在栈上完成,所以本方法退出后这些分配的资源都可以高效的回收。但是如果使用异步IO的话我们无法在一个方法内完成,所以很多东西的生命周期无形的延长了,本方法退出后还是不能销毁;这是其一,其二是要利用IO完成端口就必须进行额外的平台调用(从.NET调入到CLR甚至到Windows内核),这些调用都是非常昂贵的。所以如果我们在构建服务器应用时,如果应用的规模并不是非常大,我们还是应该首选同步的方式,这样编程更容易,消耗的资源也更少。当然,这都是基于你实际的应用经过不断的尝试和调整得出的。

      我希望本文能对某些同学心中怀疑BeginRead到底占不占用线程,IO线程又是个什么东西起到释疑的作用。

  • 相关阅读:
    面试官没想到一个Volatile,我都能跟他扯半小时
    面试官:你经历过数据库迁移么?有哪些注意点和难点?
    【逼你学习】让自制力提升300%的时间管理方法、学习方法分享
    面试官:说一下内存溢出排查过程和工具?我...
    大厂需求研发流程,进去前了解一波?
    面试必问:分布式锁实现之zk(Zookeeper)
    Redis之分布式锁实现
    大学四年自学走来,这些私藏的实用工具/学习网站我贡献出来了
    【面经分享】互联网寒冬,7面阿里,终获Offer,定级P6+
    我找到了Dubbo源码的BUG,同事纷纷说我有点东西
  • 原文地址:https://www.cnblogs.com/h1nson/p/9238332.html
Copyright © 2011-2022 走看看