zoukankan      html  css  js  c++  java
  • libuv源码分析(2)

    这一篇来分析libuv的四个特殊的持续请求(uv_handle_t族),uv_idle_t,uv_check_t,uv_prepare_t,uv_async_t。它们直接以强类型保存在uv_loop_t中,而不像其它的持续请求保存在handle_queue中,也是最简单的持续请求。

    在实现中,libuv利用C的内存布局模拟了面向对象的继承关系,uv_handle_t是基类,其所有子类都必须以其为结构体中的第一个字段。先来看看基类的定义吧,在include/uv.h的402行:

     1 #define UV_HANDLE_FIELDS                                                      \
     2   /* public */                                                                \
     3   void* data;                                                                 \
     4   /* read-only */                                                             \
     5   uv_loop_t* loop;                                                            \
     6   uv_handle_type type;                                                        \
     7   /* private */                                                               \
     8   uv_close_cb close_cb;                                                       \
     9   void* handle_queue[2];                                                      \
    10   union {                                                                     \
    11     int fd;                                                                   \
    12     void* reserved[4];                                                        \
    13   } u;                                                                        \
    14   UV_HANDLE_PRIVATE_FIELDS                                                    
    1 #define UV_HANDLE_PRIVATE_FIELDS                                              \
    2   uv_handle_t* endgame_next;                                                  \
    3   unsigned int flags;

    data:用户数据

    loop:所属的事件循环

    type:类型标识,通过内存布局和这个标识,简明的实现了继承,并不比C++语言级别的继承差,甚至更为灵活。

    close_cb:关闭时的回调,对于某些子类有效。

    handle_queue:是为了存储在uv_loop_t.handle_queue所需的链表节点。

    flags:一些状态标记,后续会讲到。

    endgame_next:持续请求关闭链表所需的节点信息,持续请求在关闭时并不是立即执行关闭收尾工作的,而是将其加入到uv_loop_t.endgame_handle链表里的.

    好了,现在可以直奔主题了。


    uv_idle_t,uv_check_t,uv_prepare_t

    这三个持续请求都是一样的作用,在每次事件循环时触发一次事件,只是优先级不同,有何不同,看uv_run函数里的while那一块就知道了,在src/core.c的387-418行:

     1 while (r != 0 && loop->stop_flag == 0) {
     2     uv_update_time(loop);
     3     uv_process_timers(loop);
     4 
     5     ran_pending = uv_process_reqs(loop);
     6     uv_idle_invoke(loop);
     7     uv_prepare_invoke(loop);
     8 
     9     timeout = 0;
    10     if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
    11       timeout = uv_backend_timeout(loop);
    12 
    13     (*poll)(loop, timeout);
    14     uv_check_invoke(loop);
    15     uv_process_endgames(loop);
    16 
    17     if (mode == UV_RUN_ONCE) {
    18       /* UV_RUN_ONCE implies forward progress: at least one callback must have
    19        * been invoked when it returns. uv__io_poll() can return without doing
    20        * I/O (meaning: no callbacks) when its timeout expires - which means we
    21        * have pending timers that satisfy the forward progress constraint.
    22        *
    23        * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
    24        * the check.
    25        */
    26       uv_process_timers(loop);
    27     }
    28 
    29     r = uv__loop_alive(loop);
    30     if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
    31       break;
    32   }

    uv_idle_invoke()是处理idle的,uv_check_invoke是处理check的,uv_prepare_invoke是处理prepare的,因此可以得出:

    • idle在poll前触发事件(poll是io处理,以后会说)
    • prepare在idle后触发事件
    • check在poll后触发事件

    这三类持续请求的实现在src/loop-watcher.c中,用了宏模拟了模板,其实现都是一样的,这里就只谈uv_idle_t了。

    先看uv_idle_t的定义,在include/uv.h的750行:

    struct uv_idle_s {
      UV_HANDLE_FIELDS
      UV_IDLE_PRIVATE_FIELDS
    };

    HV_IDLE_PRIVATE_FIELDS在uv.h的551行:

    1 #define UV_IDLE_PRIVATE_FIELDS                                                \
    2   uv_idle_t* idle_prev;                                                       \
    3   uv_idle_t* idle_next;                                                       \
    4   uv_idle_cb idle_cb

    先是基类的字段,然后是自己的字段,东西很少,只有一个回调,两个链表指针域,到这里可以知道,其是用链表存储的,头结点在uv_loop_t.idle_handle。check和prepare也是一样的结构。

    先来回忆一下uv_idle_t怎么用,然后从用的流程探究其实现细节,是这样用的:

      uv_idle_t idle;

      uv_idle_init(uv_default_loop(),&idle);

      uv_idle_start(&idle);  //关闭可以用uv_idle_stop()

      uv_run(uv_default_loop(),UV_RUN_DEFAULT);

    libuv有个特色,所有的对象是init的,而不是create的,也就是内存都有外部申请,然后传给库做初始化,这样库本身就不管理内存了,从而使其更加通用。

    init,start,stop三个函数定义在loop-watcher.c中,用宏定义的,来看看C的模板吧:

     1 #define UV_LOOP_WATCHER_DEFINE(name, NAME)                                    \
     2   int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) {              \
     3     uv__handle_init(loop, (uv_handle_t*) handle, UV_##NAME);                  \
     4                                                                               \
     5     return 0;                                                                 \
     6   }                                                                           \
     7                                                                               \
     8                                                                               \
     9   int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) {           \
    10     uv_loop_t* loop = handle->loop;                                           \
    11     uv_##name##_t* old_head;                                                  \
    12                                                                               \
    13     assert(handle->type == UV_##NAME);                                        \
    14                                                                               \
    15     if (uv__is_active(handle))                                                \
    16       return 0;                                                               \
    17                                                                               \
    18     if (cb == NULL)                                                           \
    19       return UV_EINVAL;                                                       \
    20                                                                               \
    21     old_head = loop->name##_handles;                                          \
    22                                                                               \
    23     handle->name##_next = old_head;                                           \
    24     handle->name##_prev = NULL;                                               \
    25                                                                               \
    26     if (old_head) {                                                           \
    27       old_head->name##_prev = handle;                                         \
    28     }                                                                         \
    29                                                                               \
    30     loop->name##_handles = handle;                                            \
    31                                                                               \
    32     handle->name##_cb = cb;                                                   \
    33     uv__handle_start(handle);                                                 \
    34                                                                               \
    35     return 0;                                                                 \
    36   }                                                                           \
    37                                                                               \
    38                                                                               \
    39   int uv_##name##_stop(uv_##name##_t* handle) {                               \
    40     uv_loop_t* loop = handle->loop;                                           \
    41                                                                               \
    42     assert(handle->type == UV_##NAME);                                        \
    43                                                                               \
    44     if (!uv__is_active(handle))                                               \
    45       return 0;                                                               \
    46                                                                               \
    47     /* Update loop head if needed */                                          \
    48     if (loop->name##_handles == handle) {                                     \
    49       loop->name##_handles = handle->name##_next;                             \
    50     }                                                                         \
    51                                                                               \
    52     /* Update the iterator-next pointer of needed */                          \
    53     if (loop->next_##name##_handle == handle) {                               \
    54       loop->next_##name##_handle = handle->name##_next;                       \
    55     }                                                                         \
    56                                                                               \
    57     if (handle->name##_prev) {                                                \
    58       handle->name##_prev->name##_next = handle->name##_next;                 \
    59     }                                                                         \
    60     if (handle->name##_next) {                                                \
    61       handle->name##_next->name##_prev = handle->name##_prev;                 \
    62     }                                                                         \
    63                                                                               \
    64     uv__handle_stop(handle);                                                  \
    65                                                                               \
    66     return 0;                                                                 \
    67   }                                                                           \
    68                                                                               \
    69                                                                               \
    70   void uv_##name##_invoke(uv_loop_t* loop) {                                  \
    71     uv_##name##_t* handle;                                                    \
    72                                                                               \
    73     (loop)->next_##name##_handle = (loop)->name##_handles;                    \
    74                                                                               \
    75     while ((loop)->next_##name##_handle != NULL) {                            \
    76       handle = (loop)->next_##name##_handle;                                  \
    77       (loop)->next_##name##_handle = handle->name##_next;                     \
    78                                                                               \
    79       handle->name##_cb(handle);                                              \
    80     }                                                                         \
    81   }
    82 
    83 UV_LOOP_WATCHER_DEFINE(prepare, PREPARE)
    84 UV_LOOP_WATCHER_DEFINE(check, CHECK)
    85 UV_LOOP_WATCHER_DEFINE(idle, IDLE)

    这种模板虽不自动化,但是不是更明显的表达了意图,若把uv_##name##_t换做T,就更像了。

    先来看uv_idle_init,这个函数很简单,仅调用了uv_handle_init,可以看做调用父类的构造函数,与C++的子类构造时自动调用父类的构造函数是一致的,很简单的一个理念,谁的成员谁来管。调这个函数,传入loop,自己的指针与一个UV_IDLE的东西,看看这个函数吧,它是宏,在src/uv-common.h的208行:

    #define uv__handle_init(loop_, h, type_)                                      \
      do {                                                                        \
        (h)->loop = (loop_);                                                      \
        (h)->type = (type_);                                                      \
        (h)->flags = UV__HANDLE_REF;  /* Ref the loop when active. */             \
        QUEUE_INSERT_TAIL(&(loop_)->handle_queue, &(h)->handle_queue);            \
        uv__handle_platform_init(h);                                              \
      }                                                                           \
      while (0)

    下面用base来表示uv_handle_t。UV_IDLE就是子类的标识,设置在base.type中,用来做父 is 子的判断,然后将对象加入uv_loop_t.handle_queue队列,UV_HANDLE_REF表示是否初始化过了。到这里,内存布局+type标识的手法模拟继承,已经十分完善灵活了。好了,uv_idle_init仅仅初始化了父类。这里其实是不严谨安全的,因为其他字段都没管,都是野值,应该要memset一下。

    再来看uv_idle_start(),它做了两件事:

    1. 用头插法将自己加入uv_loopt_t.idle_handle
    2. 调用uv__handle_start()设置了一些标志,在base.flags里又加上了UV_HANDLE_ACTIVE,同时增加了uv_loop_t.active_handle计数.

    ps:

         13行就用了base.type做了is的判断.

    到这里,我们见了base.flags的两个标识:UV_HANDLE_REF和UV_HANDLE_ACTIVE.前者大概表示是否初始化了基类,后者大概表示是否初始化了子类.

    那uv_idle_t的事件是如何在事件循环里触发的呢?上面不是说了uv_run里调了uv_idle_invok函数么,就在这里触发的,这个函数也定义在上面的模板里,它的功能很简单,遍历uv_loop_t.idle_handle链表,调用每个节点的回调(在uv_idle_start()里设置的)。

    最后再来看看uv_idle_stop(),它也很简单,将自己从链表里删掉,在调用uv__handle_stop重置了base.flags,递减了uv_loop_t.active_handle计数。在这里并没有删除uv_loop_t.handle_queue里的保存,当初看到这里的时候我还以为是泄露bug,还特意向libuv的邮件列表发了bug,然后官方回复我,这个保存是在uv_close里删除的。

    那么就来看看uv_close吧,这是持续请求的退出流程,在src/handle.c的67行:

    void uv_close(uv_handle_t* handle, uv_close_cb cb) {
      uv_loop_t* loop = handle->loop;
    
      if (handle->flags & UV__HANDLE_CLOSING) {
        assert(0);
        return;
      }
    
      handle->close_cb = cb;
    
      /* Handle-specific close actions */
      switch (handle->type) {
        case UV_TCP:
          uv_tcp_close(loop, (uv_tcp_t*)handle);
          return;
    
        case UV_NAMED_PIPE:
          uv_pipe_close(loop, (uv_pipe_t*) handle);
          return;
    
        case UV_TTY:
          uv_tty_close((uv_tty_t*) handle);
          return;
    
        case UV_UDP:
          uv_udp_close(loop, (uv_udp_t*) handle);
          return;
    
        case UV_POLL:
          uv_poll_close(loop, (uv_poll_t*) handle);
          return;
    
        case UV_TIMER:
          uv_timer_stop((uv_timer_t*)handle);
          uv__handle_closing(handle);
          uv_want_endgame(loop, handle);
          return;
    
        case UV_PREPARE:
          uv_prepare_stop((uv_prepare_t*)handle);
          uv__handle_closing(handle);
          uv_want_endgame(loop, handle);
          return;
    
        case UV_CHECK:
          uv_check_stop((uv_check_t*)handle);
          uv__handle_closing(handle);
          uv_want_endgame(loop, handle);
          return;
    
        case UV_IDLE:
          uv_idle_stop((uv_idle_t*)handle);
          uv__handle_closing(handle);
          uv_want_endgame(loop, handle);
          return;
    
        case UV_ASYNC:
          uv_async_close(loop, (uv_async_t*) handle);
          return;
    
        case UV_SIGNAL:
          uv_signal_close(loop, (uv_signal_t*) handle);
          return;
    
        case UV_PROCESS:
          uv_process_close(loop, (uv_process_t*) handle);
          return;
    
        case UV_FS_EVENT:
          uv_fs_event_close(loop, (uv_fs_event_t*) handle);
          return;
    
        case UV_FS_POLL:
          uv__fs_poll_close((uv_fs_poll_t*) handle);
          uv__handle_closing(handle);
          uv_want_endgame(loop, handle);
          return;
    
        default:
          /* Not supported */
          abort();
      }
    }

    利用base.type做判断,调用了不同的close函数,我们只需要看case UV_IDLE块即可,这块里调用了uv_idle_stop,因此也可以直接调用uv_close而不调stop,stop里利用了base.flags做了重复保护的。来看uv__handle__closing干了什么,在src/handle-inl.c的63行:

     1 #define uv__handle_closing(handle)                                      \
     2   do {                                                                  \
     3     assert(!((handle)->flags & UV__HANDLE_CLOSING));                    \
     4                                                                         \
     5     if (!(((handle)->flags & UV__HANDLE_ACTIVE) &&                      \
     6           ((handle)->flags & UV__HANDLE_REF)))                          \
     7       uv__active_handle_add((uv_handle_t*) (handle));                   \
     8                                                                         \
     9     (handle)->flags |= UV__HANDLE_CLOSING;                              \
    10     (handle)->flags &= ~UV__HANDLE_ACTIVE;                              \
    11   } while (0)

    主要是设置base.flags为UV_HANDLE_CLOSING,第7行又把uv_loop_t.active_handle递增了,uv__handle_stop里做了递减,是一个冗余吧。

    看最后的uv_want_endgame(),在src/handle-inl.c的88行:

    1 INLINE static void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle) {
    2   if (!(handle->flags & UV_HANDLE_ENDGAME_QUEUED)) {
    3     handle->flags |= UV_HANDLE_ENDGAME_QUEUED;
    4 
    5     handle->endgame_next = loop->endgame_handles;
    6     loop->endgame_handles = handle;
    7   }

    很简单,在base.flags里设置了UV_HANDLE_ENDGAME_QUEUED,然后将自己加入了uv_loop_t.endgame_handle链表。

    在来看持续请求生命周期的最后一步,在uv_run()中的uv_process_endgame()里,在handle-inl.c的98行:

     1 INLINE static void uv_process_endgames(uv_loop_t* loop) {
     2   uv_handle_t* handle;
     3 
     4   while (loop->endgame_handles) {
     5     handle = loop->endgame_handles;
     6     loop->endgame_handles = handle->endgame_next;
     7 
     8     handle->flags &= ~UV_HANDLE_ENDGAME_QUEUED;
     9 
    10     switch (handle->type) {
    11       case UV_TCP:
    12         uv_tcp_endgame(loop, (uv_tcp_t*) handle);
    13         break;
    14 
    15       case UV_NAMED_PIPE:
    16         uv_pipe_endgame(loop, (uv_pipe_t*) handle);
    17         break;
    18 
    19       case UV_TTY:
    20         uv_tty_endgame(loop, (uv_tty_t*) handle);
    21         break;
    22 
    23       case UV_UDP:
    24         uv_udp_endgame(loop, (uv_udp_t*) handle);
    25         break;
    26 
    27       case UV_POLL:
    28         uv_poll_endgame(loop, (uv_poll_t*) handle);
    29         break;
    30 
    31       case UV_TIMER:
    32         uv_timer_endgame(loop, (uv_timer_t*) handle);
    33         break;
    34 
    35       case UV_PREPARE:
    36       case UV_CHECK:
    37       case UV_IDLE:
    38         uv_loop_watcher_endgame(loop, handle);
    39         break;
    40 
    41       case UV_ASYNC:
    42         uv_async_endgame(loop, (uv_async_t*) handle);
    43         break;
    44 
    45       case UV_SIGNAL:
    46         uv_signal_endgame(loop, (uv_signal_t*) handle);
    47         break;
    48 
    49       case UV_PROCESS:
    50         uv_process_endgame(loop, (uv_process_t*) handle);
    51         break;
    52 
    53       case UV_FS_EVENT:
    54         uv_fs_event_endgame(loop, (uv_fs_event_t*) handle);
    55         break;
    56 
    57       case UV_FS_POLL:
    58         uv__fs_poll_endgame(loop, (uv_fs_poll_t*) handle);
    59         break;
    60 
    61       default:
    62         assert(0);
    63         break;
    64     }
    65   }
    66 }

    与uv_close一样,做了is判断后,调用子模块的相关函数,只需看case UV_IDLE块,里面调用了uv_loop_watcher_endgame(),这个函数调用了uv__handle_close(),这个函数在handle-ini.c的76行:

     1 #define uv__handle_close(handle)                                        \
     2   do {                                                                  \
     3     QUEUE_REMOVE(&(handle)->handle_queue);                              \
     4     uv__active_handle_rm((uv_handle_t*) (handle));                      \
     5                                                                         \
     6     (handle)->flags |= UV_HANDLE_CLOSED;                                \
     7                                                                         \
     8     if ((handle)->close_cb)                                             \
     9       (handle)->close_cb((uv_handle_t*) (handle));                      \
    10   } while (0

    很简单,在base.flags里加上UV_HANDLE_CLOSED,调用结束回调,将其从uv_loop_t.handle_queue里删除。

    好了,到这里,我们分析了一个持续请求的整个生命周期,这个周期可以是用base.flags来标记的,这里总结一下:

    UV_HANDLE_REF -> UV_HANDLE_ACTIVE ->  UV_HANDLE_CLOSING -> UV_HANDLE_ENDGAME_QUEUED -> UV_HANDLE_CLOSED

    正好对应了handle的几个方法:

    uv__handle_init() -> uv__handle_start() -> uv__handle_stop() - > uv__handle_closing() -> uv_want_endgame()-> uv__handle_close()

    其实uv_close和uv_want_endgame这两个方法还可以用uv_handle_t里加一个函数指针(表示虚表),子类在初始化时改写它,从而模拟一个多态来简化switch判断。


    uv_async_t


    这个持续请求用来做不同事件循环中(线程间通信)通信的,唯一一个有线程安全的函数的模块,其他子模块也使用了它,比较重要。来看看它的定义吧,在include/uv.h的760行:

    struct uv_async_s {
      UV_HANDLE_FIELDS
      UV_ASYNC_PRIVATE_FIELDS
    };

    UV_ASYNC_PRIVATE_FIELDS在src/uv-win.h的535行:

    1 #define UV_ASYNC_PRIVATE_FIELDS                                               \
    2   struct uv_req_s async_req;                                                  \
    3   uv_async_cb async_cb;                                                       \
    4   /* char to avoid alignment issues */                                        \
    5   char volatile async_sent;

    非常简单,两个基类,一个回调,async_send可以看做锁,是uv_async_send的关键。

    这个持续请求既属于uv_handle_t,又属于uv_req_t.那就先来看uv_req_t的定义吧,在include/uv.h的381行:

    1 /* Abstract base class of all requests. */
    2 struct uv_req_s {
    3   UV_REQ_FIELDS
    4 }

    UV_REQ_FIELDS在370行:

    #define UV_REQ_FIELDS                                                         \
      /* public */                                                                \
      void* data;                                                                 \
      /* read-only */                                                             \
      uv_req_type type;                                                           \
      /* private */                                                               \
      void* active_queue[2];                                                      \
      void* reserved[4];                                                          \
      UV_REQ_PRIVATE_FIELDS                                                       

    UV_REQ_PRIVATE_FIELDS在src/uv-win.h的359行:

    #define UV_REQ_PRIVATE_FIELDS                                                 \
      union {                                                                     \
        /* Used by I/O operations */                                              \
        struct {                                                                  \
          OVERLAPPED overlapped;                                                  \
          size_t queued_bytes;                                                    \
        } io;                                                                     \
      } u;                                                                        \
      struct uv_req_s* next_req

    data:用户数据域。

    type:子类标识。

    active_queue:队列指针域,为了保存在uv_loop_t.active_queue所必须的。

    io:投递iocp请求所必须的重叠结构。

    好了,一样,先回忆下它是如何用的:

      uv_async_t asy;

      uv_async_init(uv_default_loop(),&asy,on_async);

      uv_run(uv_default_loop(),UV_RUN_DEFAULT);

    然后,外部用uv_async_send()这个函数就可以触发一次事件,就好像唤醒一个线程一样。这个函数是线程安全的,同时也有一个特性:对同一个请求对象,多次调用时只保证至少触发一次,就好像windows的消息循环将同类消息合并了一样,具体细节后面分析时再说。

    先看uv_async_init干了什么,在src/async.c的40行:

     1 int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
     2   uv_req_t* req;
     3 
     4   uv__handle_init(loop, (uv_handle_t*) handle, UV_ASYNC);
     5   handle->async_sent = 0;
     6   handle->async_cb = async_cb;
     7 
     8   req = &handle->async_req;
     9   uv_req_init(loop, req);
    10   req->type = UV_WAKEUP;
    11   req->data = handle;
    12 
    13   uv__handle_start(handle);
    14 
    15   return 0;

    主要是初始化两个基类,持续请求用UV_ASYNC标识自己,进入UV_HANDLE_ACTIVE的阶段。一次性请求用UV_WAKEUP标识自己,它的一些生命周期后面再说。

    到这里,什么也看不出,看uv_async_send吧,在src/async.c的67行:

     

     1 int uv_async_send(uv_async_t* handle) {
     2   uv_loop_t* loop = handle->loop;
     3 
     4   if (handle->type != UV_ASYNC) {
     5     /* Can't set errno because that's not thread-safe. */
     6     return -1;
     7   }
     8 
     9   /* The user should make sure never to call uv_async_send to a closing */
    10   /* or closed handle. */
    11   assert(!(handle->flags & UV__HANDLE_CLOSING));
    12 
    13   if (!uv__atomic_exchange_set(&handle->async_sent)) {
    14     POST_COMPLETION_FOR_REQ(loop, &handle->async_req);
    15   }
    16 
    17   return 0;

    关键在于13-15行,从名字上看,用cas原语做了一次锁判断,持有锁的发送一个请求。直接来看看uv_atomic_exchange_set到底是啥吧,在src/atomicops-inl.h的37行:

    1 static char __declspec(inline) uv__atomic_exchange_set(char volatile* target) {
    2   return _InterlockedOr8(target, 1);

    果然是的,_InterlockedOr8是一个逻辑或原语,返回旧值。因此现在对uv_async_send可以这样理解了:它是线程安全的,发送请求会置位async_send,再async_send没有被复位之前,后续的请求都会被忽略。

    看看POST_COMPLETION_FOR_REQ干了什么,在src/req-inl.h的73行:

    1 #define POST_COMPLETION_FOR_REQ(loop, req)                              \
    2   if (!PostQueuedCompletionStatus((loop)->iocp,                         \
    3                                   0,                                    \
    4                                   0,                                    \
    5                                   &((req)->u.io.overlapped))) {         \
    6     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");       \
    7   

    哦,用uv_req_t的重叠结构发了一个iocp请求,看来最终事件的触发应该和GetCompletionStatus有关了。这里简单的说一下iocp,可以把它理解为有两个队列的东西,一个请求队列,一个通知队列,外部可以向其投递请求,告诉它我要做某事,这些请求就放在请求队列中,操作系统是马仔,它会用另外的线程去处理请求,每处理一个就往通知队列中放一个处理结果信息。使用者只需要交待事情,得知结果。

    我们直接进屋去看看犹抱琵琶半遮面的uv_async_t,iocp结果的处理在事件循环的poll函数中,去看看吧,在uv_run函数中,src/core.c的372行:

    1   void (*poll)(uv_loop_t* loop, DWORD timeout);
    2 
    3   if (pGetQueuedCompletionStatusEx)
    4     poll = &uv_poll_ex;
    5   else
    6     poll = &uv_poll

    候选两个版本的函数,优先选扩展版的,真是注意性能呀,我们看最好的uv_poll_ex,在305行:

     1 static void uv_poll_ex(uv_loop_t* loop, DWORD timeout) {
     2   BOOL success;
     3   uv_req_t* req;
     4   OVERLAPPED_ENTRY overlappeds[128];
     5   ULONG count;
     6   ULONG i;
     7   int repeat;
     8   uint64_t timeout_time;
     9 
    10   timeout_time = loop->time + timeout;
    11 
    12   for (repeat = 0; ; repeat++) {
    13     success = pGetQueuedCompletionStatusEx(loop->iocp,
    14                                            overlappeds,
    15                                            ARRAY_SIZE(overlappeds),
    16                                            &count,
    17                                            timeout,
    18                                            FALSE);
    19 
    20     if (success) {
    21       for (i = 0; i < count; i++) {
    22         /* Package was dequeued */
    23         req = uv_overlapped_to_req(overlappeds[i].lpOverlapped);
    24         uv_insert_pending_req(loop, req);
    25       }
    26 
    27       /* Some time might have passed waiting for I/O,
    28        * so update the loop time here.
    29        */
    30       uv_update_time(loop);
    31     } else if (GetLastError() != WAIT_TIMEOUT) {
    32       /* Serious error */
    33       uv_fatal_error(GetLastError(), "GetQueuedCompletionStatusEx");
    34     } else if (timeout > 0) {
    35       /* GetQueuedCompletionStatus can occasionally return a little early.
    36        * Make sure that the desired timeout target time is reached.
    37        */
    38       uv_update_time(loop);
    39       if (timeout_time > loop->time) {
    40         timeout = (DWORD)(timeout_time - loop->time);
    41         /* The first call to GetQueuedCompletionStatus should return very
    42          * close to the target time and the second should reach it, but
    43          * this is not stated in the documentation. To make sure a busy
    44          * loop cannot happen, the timeout is increased exponentially
    45          * starting on the third round.
    46          */
    47         timeout += repeat ? (1 << (repeat - 1)) : 0;
    48         continue;
    49       }
    50     }
    51     break;
    52   }
    53 }

    很简单,一部分是利用GetQueueCompletionStatusEx收取iocp的结果信息,一部分是计时器的时间处理。GetQueueCompletionStatusEx和GetQueueCompletionStatus的区别在于,前者可以一次收取多个结果通知,后者一次只能收取一个结果通知。在这只需要看收取通知后处理的部分,其他的在分析timer时再说,看上面的21-25行即可。

    这几行代码做了两件事:将重叠结构指针转为一次性请求对象,将一次性请求对象加入uv_loop_t.pending_reqs_tail。

    uv_overlapped_to_req()不得不说,这是C语言的一个奇淫技巧,根据结构体的某个成员指针计算出结构体指针,非常有用,用于一些数据结构实现中,可以节省一个数据域指针的开销。最初见与linux内核的container_of宏,实现是这样的:

    1 #define CONTAINING_RECORD(address, type, field) ((type *)( \
    2                                                   (PCHAR)(address) - \
    3                                                   (ULONG_PTR)(&((type *)0)->field))

    因为成员地址是大于结构体首地址的,所以只需将成员地址-成员的偏移址即可得,那偏移址如何计算,用0强转为一个同类型结构,然后取对应成员地址。

    uv_loop_t.pending_req_tail是一个循环单链表,uv_req_t.next_req就是指针域。uv_insert_pending_req()用尾插法将获取的结果通知对应的请求插入这个链表中。那么事件的触发应该就在消费这个链表的地方,在那呢,在uv_run()中调用uv_process_reqs消费的,这个方法在src/req-inl.c的144行:

     1 INLINE static int uv_process_reqs(uv_loop_t* loop) {
     2   uv_req_t* req;
     3   uv_req_t* first;
     4   uv_req_t* next;
     5 
     6   if (loop->pending_reqs_tail == NULL)
     7     return 0;
     8 
     9   first = loop->pending_reqs_tail->next_req;
    10   next = first;
    11   loop->pending_reqs_tail = NULL;
    12 
    13   while (next != NULL) {
    14     req = next;
    15     next = req->next_req != first ? req->next_req : NULL;
    16 
    17     switch (req->type) {
    18       case UV_READ:
    19         DELEGATE_STREAM_REQ(loop, req, read, data);
    20         break;
    21 
    22       case UV_WRITE:
    23         DELEGATE_STREAM_REQ(loop, (uv_write_t*) req, write, handle);
    24         break;
    25 
    26       case UV_ACCEPT:
    27         DELEGATE_STREAM_REQ(loop, req, accept, data);
    28         break;
    29 
    30       case UV_CONNECT:
    31         DELEGATE_STREAM_REQ(loop, (uv_connect_t*) req, connect, handle);
    32         break;
    33 
    34       case UV_SHUTDOWN:
    35         /* Tcp shutdown requests don't come here. */
    36         assert(((uv_shutdown_t*) req)->handle->type == UV_NAMED_PIPE);
    37         uv_process_pipe_shutdown_req(
    38             loop,
    39             (uv_pipe_t*) ((uv_shutdown_t*) req)->handle,
    40             (uv_shutdown_t*) req);
    41         break;
    42 
    43       case UV_UDP_RECV:
    44         uv_process_udp_recv_req(loop, (uv_udp_t*) req->data, req);
    45         break;
    46 
    47       case UV_UDP_SEND:
    48         uv_process_udp_send_req(loop,
    49                                 ((uv_udp_send_t*) req)->handle,
    50                                 (uv_udp_send_t*) req);
    51         break;
    52 
    53       case UV_WAKEUP:
    54         uv_process_async_wakeup_req(loop, (uv_async_t*) req->data, req);
    55         break;
    56 
    57       case UV_SIGNAL_REQ:
    58         uv_process_signal_req(loop, (uv_signal_t*) req->data, req);
    59         break;
    60 
    61       case UV_POLL_REQ:
    62         uv_process_poll_req(loop, (uv_poll_t*) req->data, req);
    63         break;
    64 
    65       case UV_PROCESS_EXIT:
    66         uv_process_proc_exit(loop, (uv_process_t*) req->data);
    67         break;
    68 
    69       case UV_FS_EVENT_REQ:
    70         uv_process_fs_event_req(loop, req, (uv_fs_event_t*) req->data);
    71         break;
    72 
    73       default:
    74         assert(0);
    75     }
    76   }
    77 
    78   return 1;
    79 }

    遍历链表,根据每个一次性请求的type调用不同子系统方法,uv_async_t的type为UV_WAKEUP,所以只需要看uv_process_async_wakeup_req(),它在src/async.c的87行:

     1 void uv_process_async_wakeup_req(uv_loop_t* loop, uv_async_t* handle,
     2     uv_req_t* req) {
     3   assert(handle->type == UV_ASYNC);
     4   assert(req->type == UV_WAKEUP);
     5 
     6   handle->async_sent = 0;
     7 
     8   if (handle->flags & UV__HANDLE_CLOSING) {
     9     uv_want_endgame(loop, (uv_handle_t*)handle);
    10   } else if (handle->async_cb != NULL) {
    11     handle->async_cb(handle);
    12   }

    这里是调用回调,复位async_send。到这里,就对uv_async_t的机制了然于胸了。uv_async_send可以下个最终定义了:如果当前对象没有被占用,则占用它并触发一次事件,否则被忽略。这个方法是线程安全的。什么是占有,成功调用uv_async_send直到回调被调用这个时间段。

    最后来看看它是如何销毁的,与持续性请求一样,进入UV_HANDLE_ACTICE之后的流程(closing,然后加入endgame_handles),可以调用uv_async_close或者uv_close.

    好了,到这里,四大特殊持续请求就分析完了,也对一次性请求有了一个初步认识,知道它是先放入uv_loop_t.pending_reqs_tail链表中,然后在uv_process_reqs来处理的。


     

    未完待续,下一篇分析tcp。

     

  • 相关阅读:
    Study Plan The Twelfth Day
    Study Plan The Fifteenth Day
    Study Plan The Seventeenth Day
    Study Plan The Tenth Day
    Study Plan The Eighth Day
    Study Plan The Eleventh Day
    Study Plan The Sixteenth Day
    Study Plan The Thirteenth Day
    Study Plan The Fourteenth Day
    Study Plan The Ninth Day
  • 原文地址:https://www.cnblogs.com/watercoldyi/p/5682344.html
Copyright © 2011-2022 走看看