zoukankan      html  css  js  c++  java
  • node源码详解(七) —— 文件异步io、线程池【互斥锁、条件变量、管道、事件对象】

     知识共享许可协议本作品采用知识共享署名 4.0 国际许可协议进行许可。转载保留声明头部与原文链接https://luzeshu.com/blog/nodesource7 

    本博客同步在https://cnodejs.org/topic/571618c7e84805cd5410ea26 
    本博客同步在http://www.cnblogs.com/papertree/p/5405202.html


      在上篇博客讲到,网络io通过封装io观察者(uv__io_t),添加到loop->watcher_queue队列。在2.2节中讲到文件异步io不同于网络io,文件异步io把请求操作交给线程池处理,所有线程池的异步io操作统一由一个io观察者来管理,等线程池处理完毕再通过该io观察者告知事件循环(epoll_wait)有异步io操作完成,需要在事件循环的线程执行回调函数。

      这篇博客分以下几个部分讲解其中的细节:

    1. 从文件异步io操作到封装请求交给线程池的过程

    2. 线程池的原理、相关的系统支持【互斥锁、条件变量】

    3. 线程池完成io操作后,告知主线程/事件循环的方式 —— 线程池统一的io观察者,及相关的系统支持【管道、事件对象】

    4. 主线程epoll_wait收到线程池的通知后,回调到文件异步io操作的callback的过程

    7.1 文件异步io到线程池

      上一篇博客以server.listen(80)为例来讲解网络io,这一篇以fs.writeFile('xxx', function (err, data) {});为例来讲解文件异步io。

      js代码到libuv的函数,经历了几个层次(6.2.1节-6.2.4节,“原生js lib模块 -> node C++模块 -> libuv模块”),这几个层次文件io和网络io是类似的,就忽略了。

      这里只针对libuv的文件异步io如何封装成请求对象交给线程池。

    7.1.1 libuv的文件io请求对象 —— uv_fs_t

      看一下libuv的异步读文件代码,deps/uv/src/unix/fs.c:

    图7-1-1

      可以看到一次异步文件读操作在libuv层被封装到一个uv_fs_t的结构体,req->cb是来自上层的回调函数(node C++层:src/node_file.cc 的After函数)。

      异步io请求最后调用uv__work_submit,把异步io请求提交给线程池。这里有两个函数:

    uv__fs_work:这个是文件io的处理函数,可以看到当cb为NULL的时候,即非异步模式,uv__fs_work在当前线程(事件循环所在线程)直接被调用。如果cb != NULL,即文件io为异步模式,此时把uv__fs_work和uv__fs_done提交给线程池。

    uv__fs_done:这个是异步文件io结束后的回调函数。在uv__fs_done里面会回调上层C++模块的cb函数(即req->cb)。

      这里需要注意的是,异步模式下,把uv__fs_work、uv__fs_done当成参数调用uv__work_submit向线程池提交异步io请求,此时io操作的主体 —— uv__fs_work函数是在线程池里执行的。但是uv__fs_done必须在事件循环的线程里被回调,因为这个函数最终会回调到用户js代码的回调函数,而js代码里的所有代码必须在同个线程里面

    7.1.2 线程池的请求对象 —— struct uv__work

      来看看uv__work_submit做了什么:

    图7-1-2

      uv__work_submit 把传进来的uv__fs_work、uv__fs_done封装到uv__work结构体里面,这个结构体表示一个线程操作的请求。通过post把请求提交给线程池,post的原理7.2节讲。

      看到post函数里面的QUEUE_INSERT_TAIL,把该uv__work对象加进wq链表里面。wq是一个全局静态变量。也就是说,进程空间里的所有线程共用同一个wq链表。wq队列的使用在最下面的7.4.2节会用到。

      至于通过void* [2]类型的成员变量w->wq去维护一个链表的机制,在6.4节里有介绍。


    7.2 线程池的原理 —— 条件变量与互斥锁

    7.2.1 条件变量与互斥锁基础

    1. 互斥锁 —— pthread_mutex_t mutex

      系统通过pthread_mutex_t结构、及相关的pthread_mutex_lock()、pthread_mutex_unlock()来对共享资源的请求进行加锁、解锁。

    2. 条件变量 —— pthread_cond_t condition

      系统通过pthread_cond_t结构、及相关的pthread_cond_wait()、pthread_cond_signal()函数来实现线程间等待、通知的机制。

    【注意:系统提供的条件变量机制必须结合互斥锁使用,也就是pthread_cond_wait(&condition, &mutex)需要传条件变量与一个互斥体结构,而且pthread_cond_wait之前必须获得互斥锁。其中原因简单来说就是条件变量本身也是需要加锁保护的资源。具体解释可以参考:http://stackoverflow.com/questions/6312342/pthread-cond-wait-and-mutex-requirement

    7.2.2 线程池原理

      来看看threadpool.c 文件的几个相关函数:

    图7-2-1

      这里有四个环节:

    1. 创建工作线程:

      这里的init_once函数调用uv_thread_create创建了nthreads数量的工作线程,nthread默认为4。worker为工作线程的执行函数。

      看到图7-1-2,有一行uv_once(&once, init_once); 【uv_once对应的系统调用是pthread_once】。该行代码保证了init_once 有且仅被执行一次。在第一次调用uv__work_submit()时会执行一次init_once()。

    2. 工作线程进入等待:

      看到worker线程最终会陷入uv_cond_wait【对应的系统调用是pthread_cond_wait】进行等待,且idle_threads自增。

      这里的&cond、&mutex分别是一个全局的静态条件变量、互斥体。

    3. 提交任务到线程池:

      看到post函数通过uv_cond_signal【对应的系统调用是pthread_cond_signal】向相应的条件变量——cond发送信号,处在uv_cond_wait挂起等待的工作线程当中的某个被激活。

      worker线程往下执行,从wq取出w(保存的过程见7.1节),执行w->work()(对应7.1节中的uv_fs_work)。

    4.  通知主线程的事件循环:

      工作线程完成任务后,调用uv_async_send通知主线程某个统一的io观察者。这里的机制7.3节讲。


    7.3 线程池统一的io观察者 —— 管道、事件对象

    7.3.1 管道、事件对象

      管道、事件对象都是系统提供的机制,都可以用于线程间发送数据,所以这里可以用于线程间的通知。

    1. 管道

      管道的相关系统调用是pipe()、pipe2()。参考 http://man7.org/linux/man-pages/man2/pipe.2.html

      管道会创建两个fd,往fd[1]写数据,那么fd[0]就会收到数据。那么只需要把fd[0]添加到epoll_wait()所监听的io观察者队列里面,在工作线程需要通知的时候往fd[1]写数据,即能在主线程的epoll里面监听其他工作线程任务完成的通知。

    2. 事件对象

      事件对象的相关系统调用是eventfd()、eventfd2()。参考 http://man7.org/linux/man-pages/man2/eventfd.2.html

      与管道不同的是eventfd()只会创建一个fd,事件对象的读写都通过这个fd。事件对象内部维护一个counter,往fd写一个8字节的整数,会往counter加,而读的时候会返回counter,如果counter为0,那么读操作会阻塞住(fd为阻塞模式)。而这个fd也是可以交由epoll机制进行监听的,那么也可以达到使用管道一样的目的。

     3. 使用哪个?

      这里libuv创建异步io观察者fd时,优先使用eventfd,如果系统不支持事件对象,就使用管道替代。看一下相关实现:

    图7-3-1

      可以看到使用uv__eventfd2返回-1(errno = ENOSYS)时,uv__async_start里面使用管道替代了事件对象。而判断系统是否支持eventfd,是通过__NR_eventfd2宏去判断。

      这里需要注意的是:使用宏进行判断__NR_eventfd是否defined是在编译期,而uv__async_start的执行是在运行期,也就是说,如果你在不支持事件对象的系统编译之后,在支持事件对象的系统上运行,那么uv__eventfd2始终是返回-1的

    7.3.2 异步io观察者

    7.3.2.1 数据结构 —— struct uv__async

      在6.1.3节讲了持有io观察者的结构体 uv_tcp_s,6.2.4节讲了网络io操作如何封装成uv_tcp_t结构体、并构造对应的io_watcher,6.3.1和6.4节讲了如何把io_watcher加进uv_loop_t default_loop_struct的watcher_queue队列里。

      那么类似于网络io操作的io观察者(uv__io_t io_watcher)由uv_tcp_s结构体来持有,这里要讨论的异步io观察者也是由一个数据结构(struct uv__async)持有的io观察者。通过把持有的io观察者(io_watcher)加进loop->watcher_queue队列,来加进到epoll的观察者队列中。

      看到6.1.1节中关于struct uv_loop_s default_loop_struct的截图,发现uv_loop_s里面有个成员 struct uv__async async_watcher。这个就是管理统一异步io观察者的数据结构,一个事件循环结构体(uv_loop_t)有且只有一个。类似于uv_tcp_s。

      看一些uv__async的定义,也持有一个uv__io_t io_watcher,还有封装了一个cb:

    图7-3-2

    7.3.2.2 异步io观察者的保存与回调

      我们知道一个uv_tcp_t的io观察者,是在用户调用了网络io之后,才加进到loop->watcher_queue里面的。那么这个异步io观察者是在node启动时,通过一连串调用node::Start() -> uv_default_loop() -> uv_loop_init() -> uv_async_init() -> uv__async_start(),最终调用uv__io_start(),把loop->async_watcher所持有的io_watcher加进loop->watcher_queue的。uv__async_start()也是创建事件对象/管道的地方,在上图的7-3-1可以看到。

      来看一下loop->async_watcher和loop->async_watcher.io_watcher封装的回调函数。

    图7-3-3

      可以看到loop->async_watcher.io_watcher->cb 是uv__async_io;

      loop->async_watcher.cb 是uv__async_event。

      7.2.2节讲到worker线程完成w->work()之后,通过uv_async_send通知异步io观察者,uv_async_send的操作就是往事件对象/管道写东西,那么当io观察者收到数据,uv_run()里面的epoll_wait()返回该io_watcher的fd时,uv__async_io会先被回调,在uv__async_io里面会进而调用uv__async_event。看下代码:

    图7-3-4

      uv__aysnc_io里面取出的wa就是loop->async_watcher,所以wa->cb就是uv__async_event。


    7.4 线程池异步io之后的回调工作

      讲到uv__async_event这一步,我们回想一下此时应该执行什么处理:worker线程执行完了w->work()(其中w是提交线程池的请求结构体 uv__work),然后通知事件循环需要在主线程执行w->done(),而通知的这个过程就是通过 uv_async_send()往管道/事件对象写数据,激活epoll_wait(),根据返回的fd,由loop->watchers映射表拿到异步io观察者 —— loop->async_watcher.io_watcher,然后层层回调到uv__async_event,那么这个时候,我们是否要调用线程池完成了w->work()之后剩余的w->done()?

    7.4.1 uv__async_event() 到 uv__work_done()

      node里面多次使用void*[2]类型来维护一个链表,loop->async_handles也是。可以看到图6-1-1。那么async_handles保存什么链表呢?

      看到图7-3-4,uv__async_event()就是从loop->async_handles链表里,取出struct uv_async_t结构类型的元素h,并调用回调函数h->async_cb()。

      再看到图7-3-3,uv_async_init()里面,往loop->async_handles里面添加了struct uv_async_t* t。7.3.2.2节讲到的一系列调用流程有:uv_loop_init() -> uv_async_init(),看下uv_loop_init()调用uv_async_init()的代码:

    图7-4-1

      可以看到uv_loop_init()传给uv_async_init()的uv_async_t 是loop->wq_async,而async_cb是uv__work_done。

      所以最终异步io观察者被激活之后,主线程回调到了uv__work_done()。uv__work_done在线程池模块(deps/uv/src/threadpool.c)里面。

     7.4.2 uv__work_done()

      看一下uv__work_done()的代码:

    图7-4-2

      在7.1.2节就讲了post()提交请求时,往全局队列wq添加一个uv__work数据结构,那么最终uv__work_done()被调用的时候,从该wq取出所有w,执行w->done(),完成最终的回调。这里的w->done()就是7.1节中提到的fs__work_done()。

      注意了,这里的uv__work_done()是在主线程执行的,也就是你的js代码由始至终在同一个线程里面执行

  • 相关阅读:
    2.Android之按钮Button和编辑框EditText学习
    《DSP using MATLAB》Problem 3.8
    《DSP using MATLAB》Problem 3.7
    《DSP using MATLAB》Problem 3.6
    《DSP using MATLAB》Problem 3.5
    《DSP using MATLAB》Problem 3.4
    《DSP using MATLAB》Problem 3.3
    《DSP using MATLAB》Problem 3.2
    《DSP using MATLAB》Problem 3.1
    《DSP using MATLAB》Problem 2.20
  • 原文地址:https://www.cnblogs.com/papertree/p/5405202.html
Copyright © 2011-2022 走看看