zoukankan      html  css  js  c++  java
  • libuv的源码分析(1)

    libuv我在今年四月份的时候开始接触,一开始也遇到了很多坑,但后来理解并遵守了它的设计思想,一切就变得很方便。这几天开始着手精读它的源码,本着记录自己的学习痕迹,也希望能增加别人搜索相关问题结果数的目的,因此就有了这些东西,这个系列至少会有四篇,后续再说吧。

    那么它是什么,一个高效轻量的跨平台异步io库,在linux下,它整合了libevent,在windows下,它用iocp重写了一套。它有那些功能,如下面这幅官网上的图所示:

    它的整体结构基于事件循环,简单的说就是外部的接口其实是对内层的一个个请求,并没有做真正的事,这些请求都存储在内部一个请求队列中,在事件循环中,再从请求队列中取出他们,然后做具体的事情,做完了利用回调函数通知调用者,这样一来,所有的外部接口都可以变成异步的。

    它的世界有三类元素:

    1. uv_loop_t,表示事件循环,为其他两类元素提供环境容器和统筹调度。
    2. uv_handle_t族,持续性请求,生命周期较长,且能多次触发事件。
    3. uv_req_t族,一次性请求.

    它的实现基于一个假设,运行于单线程.

    下面主要来分析它的主体和常用部分,循环,timer,tcp,udp,fs,特殊的uv_handle_t.这次分析是基于其windows部分的,因为我在windows上用它比较多,不过其不同平台上的思路是一致的,只是具体实现上采用的系统api不同罢了.库版本基于1.7.0版本.


     

    uv_loop_t

    先来看定义,在include/uv.h的1459行:

    struct uv_loop_s {
      /* User data - use this for whatever. */
      void* data;
      /* Loop reference counting. */
      unsigned int active_handles;
      void* handle_queue[2];
      void* active_reqs[2];
      /* Internal flag to signal loop stop. */
      unsigned int stop_flag;
      UV_LOOP_PRIVATE_FIELDS
    };
    
    #define UV_LOOP_PRIVATE_FIELDS                                                \
        /* The loop's I/O completion port */                                      \
      HANDLE iocp;                                                                \
      /* The current time according to the event loop. in msecs. */               \
      uint64_t time;                                                              \
      /* Tail of a single-linked circular queue of pending reqs. If the queue */  \
      /* is empty, tail_ is NULL. If there is only one item, */                   \
      /* tail_->next_req == tail_ */                                              \
      uv_req_t* pending_reqs_tail;                                                \
      /* Head of a single-linked list of closed handles */                        \
      uv_handle_t* endgame_handles;                                               \
      /* The head of the timers tree */                                           \
      struct uv_timer_tree_s timers;                                              \
        /* Lists of active loop (prepare / check / idle) watchers */              \
      uv_prepare_t* prepare_handles;                                              \
      uv_check_t* check_handles;                                                  \
      uv_idle_t* idle_handles;                                                    \
      /* This pointer will refer to the prepare/check/idle handle whose */        \
      /* callback is scheduled to be called next. This is needed to allow */      \
      /* safe removal from one of the lists above while that list being */        \
      /* iterated over. */                                                        \
      uv_prepare_t* next_prepare_handle;                                          \
      uv_check_t* next_check_handle;                                              \
      uv_idle_t* next_idle_handle;                                                \
      /* This handle holds the peer sockets for the fast variant of uv_poll_t */  \
      SOCKET poll_peer_sockets[UV_MSAFD_PROVIDER_COUNT];                          \
      /* Counter to keep track of active tcp streams */                           \
      unsigned int active_tcp_streams;                                            \
      /* Counter to keep track of active udp streams */                           \
      unsigned int active_udp_streams;                                            \
      /* Counter to started timer */                                              \
      uint64_t timer_counter;                                                     \
      /* Threadpool */                                                            \
      void* wq[2];                                                                \
      uv_mutex_t wq_mutex;                                                        \
      uv_async_t wq_async;
    active_handles是uv_loop_t的引用计数,每投递一个uv_handle_t或uv_req_t都会使其递增,结束一个请求都会使其递减。
    handle_queue是uv_handle_t族的队列。
    active_queue是uv_req_t族的队列。
    data是用户数据域。
    uv_prepare_t,uv_check_t,uv_idle_t,uv_async_t是个特殊的uv_handle_t,随后会说明。
    pending_reqs_tail是用来装已经处理过的请求的队列。
    endgame_handles是用来装执行了关闭的uv_handle_t族链表。
    timers是计时器结构,用最小堆实现。
    iocp是完成端口的句柄。

    这个结构外部除了data外,其他都不应该使用。与其直接有关的接口有两个, uv_loop_init和uv_run。
    先来看uv_loop_init,在/src/core.c的126行:
    int uv_loop_init(uv_loop_t* loop) {
      int err;
    
      /* Initialize libuv itself first */
      uv__once_init();
    
      /* Create an I/O completion port */
      loop->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
      if (loop->iocp == NULL)
        return uv_translate_sys_error(GetLastError());
    
      /* To prevent uninitialized memory access, loop->time must be initialized
       * to zero before calling uv_update_time for the first time.
       */
      loop->time = 0;
      uv_update_time(loop);
    
      QUEUE_INIT(&loop->wq);
      QUEUE_INIT(&loop->handle_queue);
      QUEUE_INIT(&loop->active_reqs);
      loop->active_handles = 0;
    
      loop->pending_reqs_tail = NULL;
    
      loop->endgame_handles = NULL;
    
      RB_INIT(&loop->timers);
    
      loop->check_handles = NULL;
      loop->prepare_handles = NULL;
      loop->idle_handles = NULL;
    
      loop->next_prepare_handle = NULL;
      loop->next_check_handle = NULL;
      loop->next_idle_handle = NULL;
    
      memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets);
    
      loop->active_tcp_streams = 0;
      loop->active_udp_streams = 0;
    
      loop->timer_counter = 0;
      loop->stop_flag = 0;
    
      err = uv_mutex_init(&loop->wq_mutex);
      if (err)
        goto fail_mutex_init;
    
      err = uv_async_init(loop, &loop->wq_async, uv__work_done);
      if (err)
        goto fail_async_init;
    
      uv__handle_unref(&loop->wq_async);
      loop->wq_async.flags |= UV__HANDLE_INTERNAL;
    
      return 0;
    
    fail_async_init:
      uv_mutex_destroy(&loop->wq_mutex);
    
    fail_mutex_init:
      CloseHandle(loop->iocp);
      loop->iocp = INVALID_HANDLE_VALUE;
    
      return err;
    }

    这里初始化了自己各个字段,并用uv__once_init模拟了pthread_once_t初始化了库的各个子系统。

    再来看看uv_run,这是核心函数,库的入口与发动机,在src/core.c的372行:

     1 int uv_run(uv_loop_t *loop, uv_run_mode mode) {
     2   DWORD timeout;
     3   int r;
     4   int ran_pending;
     5   void (*poll)(uv_loop_t* loop, DWORD timeout);
     6 
     7   if (pGetQueuedCompletionStatusEx)
     8     poll = &uv_poll_ex;
     9   else
    10     poll = &uv_poll;
    11 
    12   r = uv__loop_alive(loop);
    13   if (!r)
    14     uv_update_time(loop);
    15 
    16   while (r != 0 && loop->stop_flag == 0) {
    17     uv_update_time(loop);
    18     uv_process_timers(loop);
    19 
    20     ran_pending = uv_process_reqs(loop);
    21     uv_idle_invoke(loop);
    22     uv_prepare_invoke(loop);
    23 
    24     timeout = 0;
    25     if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
    26       timeout = uv_backend_timeout(loop);
    27 
    28     (*poll)(loop, timeout);
    29 
    30     uv_check_invoke(loop);
    31     uv_process_endgames(loop);
    32 
    33     if (mode == UV_RUN_ONCE) {
    34       /* UV_RUN_ONCE implies forward progress: at least one callback must have
    35        * been invoked when it returns. uv__io_poll() can return without doing
    36        * I/O (meaning: no callbacks) when its timeout expires - which means we
    37        * have pending timers that satisfy the forward progress constraint.
    38        *
    39        * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
    40        * the check.
    41        */
    42       uv_process_timers(loop);
    43     }
    44 
    45     r = uv__loop_alive(loop);
    46     if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
    47       break;
    48   }
    49 
    50   /* The if statement lets the compiler compile it to a conditional store.
    51    * Avoids dirtying a cache line.
    52    */
    53   if (loop->stop_flag != 0)
    54     loop->stop_flag = 0;
    55 
    56   return r;
    57 }

    循环的流程用官方一张图来表示:

     这些具体的处理函数也基本是各个子系统的入口函数,在后面分析各个子系统时会详细说明,这里只分析它的三种运行模式,也就是mode参数指定的:

    1. UV_ONCE,阻塞模式运行,直到处理了一次请求。关键在于阻塞,这个阻塞点发生在26-28行,由uv_backend_timeout选定一个超时时间,让poll函数进行超时阻塞,那么来看看uv_backend_timeout的选定规则,在src/core.c的234行:
       1 int uv_backend_timeout(const uv_loop_t* loop) {
       2   if (loop->stop_flag != 0)
       3     return 0;
       4 
       5   if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
       6     return 0;
       7 
       8   if (loop->pending_reqs_tail)
       9     return 0;
      10 
      11   if (loop->endgame_handles)
      12     return 0;
      13 
      14   if (loop->idle_handles)
      15     return 0;
      16 
      17   return uv__next_timeout(loop);

      是这样的,A有任何可处理的请求,返回0.B取最小计时器事件的时间与当前时间的差值。C没有计时器事件返回INFINITE,直到有一个完成端口事件。

    2. UV_DEFAULT,阻塞运行到直到手动停止或没有被任何请求引用。
    3. UV_NOWAIT,运行一次循环流程,跳过了uv_backend_timeout这一步。

    最后来说一下uv_loopt_t是如何退出的,A.stop_flag为1.B.没有任何请求(也就是uv_loop_alive的判断)。用uv_stop可以设置stop_flag为1。

    未完待续,下一篇讲四大特殊uv_handle_t。

  • 相关阅读:
    1.27
    1.25
    Representation Learning with Contrastive Predictive Coding
    Learning a Similarity Metric Discriminatively, with Application to Face Verification
    噪声对比估计(负样本采样)
    Certified Adversarial Robustness via Randomized Smoothing
    Certified Robustness to Adversarial Examples with Differential Privacy
    Dynamic Routing Between Capsules
    Defending Adversarial Attacks by Correcting logits
    Visualizing Data using t-SNE
  • 原文地址:https://www.cnblogs.com/watercoldyi/p/5675180.html
Copyright © 2011-2022 走看看