zoukankan      html  css  js  c++  java
  • libevent 网络IO分析

    libevent 网络IO分析

    1 简介

    libevent 是一个基于事件触发的网络库,轻量级,代码精炼易读且跨平台,其底层会根据所运行的平台选择对应的 I/O 复用机制,libevent 是一个典型的 reactor 设计模式,简单的说加入你对某一事件感兴趣,比如想知道某个 socket 是否可读,你可以把这一事件与某个处理该事件的回调函数关联起来形成一个 event,然后注册到 libvent 内核,当该事件发生的时候 libvent 就会通过该 event 调用你给的回调函数。 写这一文章的目的主要是为了总结自己在学习与阅读 libevent 源代码过程中的经验与知识,也可供以后参考,本文主要是基于 libvent1.4.15 进行分析。

    2 简单使用与入门

    首先来看几个简单的使用例子程序,通过这些代码我们可以快速的入门,了解 libevent 的大致用法

    2.1 定时器-timeout 超时回调

    下面的代码实现了一个定时调用回调函数 timeout_cb 的功能,详细请看代码注释:

    int lasttime;
    
    static void
    timeout_cb(int fd /*超时回调,没有用*/, 
    short event /*libevent 调用该回调时告知用户发生的事件,此处应该是 EV_TIMEOUT*/,
    void *arg /*注册的时候设置的参数*/)
    {
      struct timeval tv;
      struct event *timeout = arg;
      int newtime = time(NULL);
    
      printf("%s: called at %d: %d
    ", __func__, newtime,
          newtime - lasttime);
      lasttime = newtime;
    
      evutil_timerclear(&tv);
      tv.tv_sec = 2;
        //调用一次之后再注册该事件,2s 之后通知我
        //如果不添加,libevent 中就没有 event 会自动退出
      event_add(timeout, &tv);
    }
    
    int
    main (int argc, char **argv)
    {
        //libevent 的一个 event,用于关联 handle 与 callback
      struct event timeout; 
      struct timeval tv;
    
      /* Initalize the event library 初始化*/
      event_init();
    
      /* Initalize one event */
      evtimer_set(&timeout/*设置 event*/, 
        timeout_cb /*回调函数,看原型*/, 
        &timeout/*调用回调函数时传给回调函数的参数 arg*/);
    
      evutil_timerclear(&tv); // 初始化事件结构体
      tv.tv_sec = 2;
      event_add(&timeout, &tv); //注册 event,设置超时时间为 2 秒调用一次
    
      lasttime = time(NULL);
    
      event_dispatch();//运行 libevent,进行事件分发,这里会阻塞
    
      return (0);
    }
    

    2.2 信号事件

    当运行一下程序时,中断该程序 3 次就会退出程序

    int called = 0;
    static void
    signal_cb(int fd, short event, void *arg)
    {
    
        struct event *signal = arg;
    
        printf("%s: got signal %d
    ", __func__, EVENT_SIGNAL(signal));
    
        if (called >= 2)//第三次调用就会将该 event del 注销掉
            event_del(signal);
    
        called++;
    }
    int
    main (int argc, char **argv)
    {
    #ifdef WIN32
        {
            //win32 下要初始化 winsock2,否则运行不成功,官方的 sample 代码在 win32 运行不成功
            WORD winsock_ver = MAKEWORD(2, 2);
            WSAData wsa_data;
            bool did_init_ = (WSAStartup(winsock_ver, &wsa_data) == 0);
    
            if (did_init_)
            {
                assert(wsa_data.wVersion == winsock_ver);
                WSAGetLastError();
            }
        }
    #endif
    
        struct event signal_int;
    
        /*
         * 新建一个 libevent 实例,实际上例子 timeout 中的 event_init 仅仅是对 event_base_new 的封装
         * 然后赋值给一个全局变量 event_base *current_base
         */
        struct event_base* base = event_base_new();
    
        /* Initalize one event */
        /*标识 EV_PERSIST 表示永久事件,添加一次即可,回调 callback 后 libevent 会自动重新注册
         *不用用户自动添加,例子 1 的 timeout 非 EV_PERSIST 事件,需要自己添加*/
        event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST,
                  signal_cb,
                  &signal_int);
        event_base_set(base, &signal_int); //将该 event 与新建的 libevent 实例关联
    
        event_add(&signal_int, NULL); //添加到 base 内核中
    
        event_base_dispatch(base);
        event_base_free(base); //最后要释放我们新建的实例
    
        return (0);
    }
    

    2.3 读取 socket

    该例子通过创建一个 socket pair 然后往其中写入数据,然后将另外一个 socket 注册到 libvent 中,当该 socket 可能,我们的回调函数就会被调用,(代码从官方的测试代码中截取,未运行是否通过)

    int pair[2];
    int test_ok;
    static void
    simple_read_cb(int fd, short event, void *arg)
    {
      char buf[256];
      int len;
    
      if (arg == NULL)
        return;
    
      len = read(fd, buf, sizeof(buf));
    
      if (len) {
        if (!called) {
          if (event_add(arg, NULL) == -1)
            exit(1);
        }
      } else if (called == 1)
        test_ok = 1;
    
      called++;
    }
    int main(void)
    {
    #ifdef WIN32
      WORD wVersionRequested;
      WSADATA wsaData;
      int err;
    
      wVersionRequested = MAKEWORD( 2, 2 );
    
      err = WSAStartup( wVersionRequested, &wsaData );
    #endif
      struct event_base *base;
      struct event ev1;
        /*创建 socket pair*/
      if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, pair) == -1) {
        fprintf(stderr, "%s: socketpair
    ", __func__);
        exit(1);
      }
        /*写入数据之后关闭*/
      write(pair[0], TEST1, strlen(TEST1)+1);
      shutdown(pair[0], SHUT_WR);
    
      base = event_base_new();
        /*监视该 socket 是否可读,可读的话回调我们的 callback*/
      event_set(&ev1, pair[1], EV_READ, simple_read_cb, &ev1);
      event_base_set(base, &ev1);
      event_add(&ev1, NULL);
        test_ok = 0;
      event_base_dispatch(base);
    
      event_base_free(base);
    }
    

    3 操作系统 I/O 模型封装

    (简绍一款 windows 下分析源代码神器 source insight,linux 下可用 wine 运行) libevent 将各个平台的 I/O 模型封装抽象化,然后通过函数指针的方式进行调用,上层代码只关注 event 的处理,底层与句柄相关如 socket,file,singnal,timeout 相关的部分交给操作系统进行处理,libevent 上层代码通过结构体 struct eventop 中的几个函数指针 init、add、del、dispatch 和 dealloc 调用与操作系统相关的代码,eventop 相当于一个抽象类,其中的几个函数指针相当于纯虚函数,不同的 i/o 模型则是该 eventop 的子类。struct eventop 原型如下:

    struct eventop {
      const char *name;
      void *(*init)(struct event_base *); 
      int (*add)(void *, struct event *); 
      int (*del)(void *, struct event *);
      int (*dispatch)(struct event_base *, void *, struct timeval *);
      void (*dealloc)(struct event_base *, void *);
      /* set if we need to reinitialize the event base */
      int need_reinit;
    };
    

    与特定平台相关的 I/O 多路复用模块都有一个全局的 struct eventop 变量如:

    • socket 编程中的 select, 源文件 select.c: const struct eventop selectops
    • win32 平台,源文件 win32.c: struct eventop win32ops (实际是使用 select,由于 win32 下的 select 的关系导致 libevent 在 win32 下性能不如意)

    最后在 event.c 中将这几个与操作系统底层相关的模块汇总,根据特定平台或者定义的宏选择合适的 I/O 模型进行编译:

    /* In order of preference */
    static const struct eventop *eventops[] = {
    #ifdef HAVE_EVENT_PORTS
      &evportops,
    #endif
    #ifdef HAVE_WORKING_KQUEUE
      &kqops,
    #endif
    #ifdef HAVE_EPOLL
      &epollops,
    #endif
    #ifdef HAVE_DEVPOLL
      &devpollops,
    #endif
    #ifdef HAVE_POLL
      &pollops,
    #endif
    #ifdef HAVE_SELECT
      &selectops,
    #endif
    #ifdef WIN32
      &win32ops,
    #endif
      NULL
    };
    

    4 源码分析-基本功能

     

    4.1 超时机制-对 timeout 例子进行源码分析

    libvent 提供了超时机制,比如注册某个 event,希望过了某个特定的时间 timeout(超时)后调用我们的 callback(回调函数),实现该功能主要用到的数据结构是二叉堆 binary-heap(以前是红黑树 rb-tree),超时功能可以实现

    • 定时器检测某个进程是否正常运行或者文件是否已被更新
    • 游戏编程里面画面的绘制及帧控制
    • 网络编程中的心跳包发送

    4.1.1 libevent 初始化

    event_init 初始化 libevent,该函数其实是创建一个 libvent 实例将其赋值给一个全局变量参看以下代码,我们可以通过 event_base_new 自己新建一个实例然后丢到某个线程 dispatch

    struct event_base *
    event_init(void)
    {
      struct event_base *base = event_base_new();
    
      if (base != NULL)
        current_base = base; //current_base 实际是一个全局变量
    
      return (base);
    }
    

    struct event_base 是 libvent 的核心结构体,后面的代码基本是与 event_base 打交道,原型如下:

    struct event_base {
      const struct eventop *evsel; // 选择的 i/o 复用模型
      void *evbase; //调用 i/o 模型 evsel->init 返回的变量,之后调用与 evsel 相关的函数都会将该变量传入
      int event_count;    /* counts number of total events 当前注册的 event 总数*/
      int event_count_active; /* counts number of active events 处于活动队列的 event 总数,即即将被回调的 event*/
      int event_gotterm;    /* Set to terminate loop 正常退出 dispatch*/
      int event_break;    /* Set to terminate loop immediately 马上退出 dispatch*/
      /* active event management */
      //1. active list 即将被回调
      // - 比如注册一个 2s timeout event,2s 过后该 event 会被放到该 list 等待被回调
      // - 注册一个 socket read event,当 socket 可读会将与该 socket     关联的 event 放到 list 等待回调
      //2. 指针数组的原因是要实现一个优先级,数组头优先级最高,先被调用
      struct event_list **activequeues; 
      int nactivequeues; //activequeues 数组元素个数
      /* signal handling info */
      struct evsignal_info sig; //信号相关
      struct event_list eventqueue; //插入的所有 event
      struct timeval event_tv;  
      struct min_heap timeheap; //二叉堆
      struct timeval tv_cache;
    };
    

    实际创建于初始化的过程在 event_base_new 中进行,可以看到初始化操作系统相关的 io 模型过程是遍历 eventops 数组调用其元素 eventtops[i]->init 后赋值给 base->evbase

    struct event_base *
    event_base_new(void)
    {
      int i;
      struct event_base *base;
    
      if ((base = calloc(1, sizeof(struct event_base))) == NULL)
        event_err(1, "%s: calloc", __func__);
    
      event_sigcb = NULL;
      event_gotsig = 0;
    
      detect_monotonic();
      gettime(base, &base->event_tv);
    
      min_heap_ctor(&base->timeheap);
      TAILQ_INIT(&base->eventqueue);
      base->sig.ev_signal_pair[0] = -1;
      base->sig.ev_signal_pair[1] = -1;
    
      base->evbase = NULL;
        //调用平台相关的初始化过程,并复制给 base->evbase
      for (i = 0; eventops[i] && !base->evbase; i++) {
        base->evsel = eventops[i];
        base->evbase = base->evsel->init(base);
      }
    
      if (base->evbase == NULL)
        event_errx(1, "%s: no event mechanism available", __func__);
    
      if (evutil_getenv("EVENT_SHOW_METHOD")) 
        event_msgx("libevent using: %s
    ",
             base->evsel->name);
    
      /* allocate a single active event queue */
      event_base_priority_init(base, 1); //优先级队列,后面讲解
    
      return (base);
    }
    

    4.1.2 注册 event

    在 timeout 例子中首先是调用 evtimer_set 初始化 event 后在 event_add 到 libevent 实例,evtimer_set 只是一个宏,其实际调用的是 event_set:

    #define evtimer_set(ev, cb, arg)  event_set(ev, -1, 0, cb, arg)
    

    struct event 能够与我们的 handle 关联然后注册到 libvent 中,其中的原型定义以及注释如下,一些字段开始不理解没关系,可以往下阅读然后回来参考

    //event 中有三个链表节点,用于插入到 event_base 和 singnal_info.list 中
    //该链表实现可以参看 queue.h,实现得非常精巧,对于理解后续代码很有帮助
    struct event {
      TAILQ_ENTRY (event) ev_next; //所有已注册的 event
      TAILQ_ENTRY (event) ev_active_next; //active list
      TAILQ_ENTRY (event) ev_signal_next; //singnal list
      unsigned int min_heap_idx;  /* for managing timeouts 最小堆,标识自己在堆中的位置,主要给对函数操作*/
    
      struct event_base *ev_base; //指向 dispatch 自己的 event_base
    
      int ev_fd;  //关联的文件描述符,timeout event 被忽略
      short ev_events; //监听的事件
      short ev_ncalls; //插入 active 之后要被调用的次数
      short *ev_pncalls;  /* Allows deletes in callback 通过该变量可以再调用过程中删除,不用关心*/
    
      //超时的时间与 min_heap_idx 配合使用,用于二叉堆排序,时间最小最快发生的在堆顶
      struct timeval ev_timeout;
      /* 优先级,将被回调时字段 ev_active_next 插在 ev_base->activequeues[ev_pri]中*/
      int ev_pri;   
    
      //指定的回调函数与参数
      void (*ev_callback)(int, short, void *arg);
      void *ev_arg;
    
      int ev_res;   /* result passed to event callback */
      int ev_flags;////标志位,标志该 event 在哪个链表中,为 EVLIST_*的多种组合
    };
    

    libvent 通过使用链表来管理所有的 event,struct event 中的三个链表节点用于插入到聊表中,event_set 用于将 struct event 中的各个字段初始赋值

    //设置并初始化 event
    //ev  注册的 event
    // fd  文件描述符,如果是 timeout event 则忽略该参数
    //events  关心的事件 EV_TIMEOUT, EV_SIGNAL,  EV_READ, or EV_WRITE 可以通过或运算符同时关心多个事件,发生
    //            对应事件 callback 将会被调用
    //callback 回调函数,被回调时 fd 和 arg 会传给它 callback(fd, arg)
    //arg  传给 callback 的自定义参数
    void
    event_set(struct event *ev, int fd, short events,
        void (*callback)(int, short, void *), void *arg)
    {
      /* Take the current base - caller needs to set the real base later */
      ev->ev_base = current_base; //默认对给全局 reactor 实例进行 io 分发
    
      ev->ev_callback = callback;
      ev->ev_arg = arg;
      ev->ev_fd = fd;
      ev->ev_events = events;
      ev->ev_res = 0;
      ev->ev_flags = EVLIST_INIT;
      ev->ev_ncalls = 0;
      ev->ev_pncalls = NULL;
      min_heap_elem_init(ev);
    
      /* by default, we put new events into the middle priority */
      if(current_base)
        ev->ev_pri = current_base->nactivequeues/2;
    }
    

    然后将 event 真正的添加的内核中,根据 event 中的 ev_flags 中的标志位获知该 event 监听的事件类型为哪几种,插入到对应的数据结构中,下面的代码做了部分裁剪,其中如果监听 EV_READ、EV_WRITE 或者 EV_SIGNAL 则丢给 OS 底层相关,如果为超时则通过 event_queue_insert 插入到 EVLIST_TIMEOUT 中,本节部分我们只要关注 timeout,其他的先暂时不要理会,event_add 参数 tv 指定的是参数 ev 在多少时间后超时回调

    int
    event_add(struct event *ev, const struct timeval *tv)
    {
      struct event_base *base = ev->ev_base;//获得与该事件关联的 event_base
      const struct eventop *evsel = base->evsel; //底层与 OS 相关的 I/O 分发模式
      void *evbase = base->evbase;//底层与 OS 相关的 I/O 分发模式参数
      int res = 0;
        ...
       //如果该事件有超时选项(tv 不为 NULL)
       //预先在二叉堆中预留一个空位给新添加的 event
      if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
        if (min_heap_reserve(&base->timeheap,
          1 + min_heap_size(&base->timeheap)) == -1)
          return (-1);  /* ENOMEM == errno */
      }
    
    
      if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && //如果监听有非超时意外 event
          !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { //且未插入到 libevent
        res = evsel->add(evbase, ev);//添加到 OS 的 IO 分发 reactor
        if (res != -1)
          event_queue_insert(base, ev, EVLIST_INSERTED);
      }
    
      /* 
       * we should change the timout state only if the previous event
       * addition succeeded.
       */
      if (res != -1 && tv != NULL) {
        struct timeval now;
            ....
        gettime(base, &now);
        evutil_timeradd(&now, tv, &ev->ev_timeout);
    
        event_debug((
           "event_add: timeout in %ld seconds, call %p",
           tv->tv_sec, ev->ev_callback));
    
        event_queue_insert(base, ev, EVLIST_TIMEOUT); //添加到超时队列中
      }
    
      return (res);
    }
    

    看一下 event_queue_insert 中插入 EVLIST_TIMEOUT 超时 event 的过程,它将该 event 插入到 base 中的 timeheap 中,如下代码所以,可以看到 event_queue_insert 函数根据标志参数 queue 插入到 base 中不同的字段数据结构中,我们此处关心 EVLIST_TIMEOUT 就可以。现在我们已经成功的将一个 timeout event 添加到了 libevent 当中,之后就是分发阻塞等待被回调

    void
    event_queue_insert(struct event_base *base, struct event *ev, int queue)
    {
      if (ev->ev_flags & queue) {
        /* Double insertion is possible for active events */
        if (queue & EVLIST_ACTIVE)
          return;
    
        event_errx(1, "%s: %p(fd %d) already on queue %x", __func__,
             ev, ev->ev_fd, queue);
      }
      if (~ev->ev_flags & EVLIST_INTERNAL)
        base->event_count++;
    
      ev->ev_flags |= queue;
      switch (queue) {
      case EVLIST_INSERTED:
        TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
        break;
      case EVLIST_ACTIVE:
        base->event_count_active++;
        TAILQ_INSERT_TAIL(base->activequeues[ev->ev_pri],
            ev,ev_active_next);
        break;
      case EVLIST_TIMEOUT: {
        min_heap_push(&base->timeheap, ev);
        break;
      }
      default:
        event_errx(1, "%s: unknown queue %x", __func__, queue);
      }
    }
    

    4.1.3 dispatch 进入循环,分发回调

    将事件注册好后我们就可以进行 dispatch 进入循环,当有事件发生的时候(计时器超时),我们注册的回调函数就会被调用,跟踪 event_dispatch 函数进去,最后的逻辑部分在 event_base_loop, 其重要过程为:

    • 获取堆中堆顶 event(时间最靠前最早超时)的超时时间 tv
    • 调用 OS 的 I/O 分发并传递 tv,超时时间为 tv(先不用在意底层是做什么,可能它直接调用 sleep(tv)也说不定)
    • OS I/O 分发结束,从堆中取出所有比当前时间小(超时)的元素,插入到 libevent 的活动队列 base->active_queues
    • 对 base->active_queues 中的 events 进行调用

    event_base_loop 的具体实现:

    int
    event_base_loop(struct event_base *base, int flags)
    {
      const struct eventop *evsel = base->evsel;
      void *evbase = base->evbase;
      struct timeval tv;
      struct timeval *tv_p;
      int res, done;
        ....
        done = 0;
      while (!done) {
            //省略
            .....
        timeout_correct(base, &tv);
    
        tv_p = &tv;
        if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
           //获取到 base->timeheap 中最先超时的时间
           //如果没有 tv_p 被赋值 NULL,注意参数是 timeval**
          timeout_next(base, &tv_p);
        } else {
          /* 
           * if we have active events, we just poll new events
           * without waiting.
           */
          evutil_timerclear(&tv);
        }
    
        /* If we have no events, we just exit */
        if (!event_haveevents(base)) {//内核中没用要监听的事件退出
          event_debug(("%s: no events registered.", __func__));
          return (1);
        }
    
        /* update last old time */
        gettime(base, &base->event_tv);
    
        /* clear time cache */
        base->tv_cache.tv_sec = 0;
        //调用 OS 的 I/O 分发,tv_p 表示超时的时间(如果不为 NULL)
        //比如如果 OS 的 I/O 分发采用 select,那么 tv_p 相当告诉 select 超时的时间,即正好是我们
        //添加到 base->timeheap 最先超时的 event 的时间(最小堆堆顶时间最靠前)
        res = evsel->dispatch(base, evbase, tv_p);
        if (res == -1)
          return (-1);
        gettime(base, &base->tv_cache);
        //处理超时的 event,通过获取最小堆堆顶与当前时间比较是否超时
        //如果超时则将 event 插入到 base->activequeues 并将 base->event_count_active 加 1
        timeout_process(base);
    
        if (base->event_count_active) { //如果有活动 event
          event_process_active(base); //处理活动 event
          if (!base->event_count_active && (flags & EVLOOP_ONCE))
            done = 1;
        } else if (flags & EVLOOP_NONBLOCK)
          done = 1;
      }
    
      /* clear time cache */
      base->tv_cache.tv_sec = 0;
      return (0);
    }
    

    timeout_process 即是从堆中取出所有超时 event 插入到 base->actice_queue 中

    void
    timeout_process(struct event_base *base)
    {
      struct timeval now;
      struct event *ev;
      if (min_heap_empty(&base->timeheap))
        return;
      gettime(base, &now);
      //取出所有超时 event
      while ((ev = min_heap_top(&base->timeheap))) {
        //与当前时间比较,如果大于当前时间
        //则说明没用超时的 event
        if (evutil_timercmp(&ev->ev_timeout, &now, >)) 
          break;
    
        /* delete this event from the I/O queues */
        event_del(ev);
    
        event_debug(("timeout_process: call %p",
           ev->ev_callback));
        //插入到活动队列中
        event_active(ev, EV_TIMEOUT, 1);
      }
    }
    

    4.1.4 兼备事件优先级,统一处理回调

    可以从 event_base_loop 中看到处理回调的过程的代码片

    if (base->event_count_active) { //如果有活动 event
      event_process_active(base); //处理活动 event
      if (!base->event_count_active && (flags & EVLOOP_ONCE))
        done = 1;
    }
    

    如果我们关心的事件发生了(read、write、timeout or singnal),event_process_active 将会调用处理回调,其中 base->event_count_active 指明事件到来的总数,在这里我们就随带将一下 libevent 中回调优先级的过程,注册到 libevent 的 event 是可以带优先级的,优先级最高最先调用,假如活动队列中总是有一个或者几个 event 优先级高于其他的 event,那么低优先级的 event 的将永远也不会被 callback。具体看代码与注释:

    static void
    event_process_active(struct event_base *base)
    {
      struct event *ev;
      struct event_list *activeq = NULL;
      int i;
      short ncalls;
      //取得数组 nactivequeues 最靠前的一个非 NULL 元素
      //即优先级最大的一个活动队列
      for (i = 0; i < base->nactivequeues; ++i) {
        if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
          activeq = base->activequeues[i];
          break;
        }
      }
      assert(activeq != NULL);
      //一次对该最大优先级队列的 event 进行回调
      //优先级小的等到下次被调用时处理
      for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
        //忽略....
      ncalls = ev->ev_ncalls;
        ev->ev_pncalls = &ncalls;
        while (ncalls) {
          ncalls--;
          ev->ev_ncalls = ncalls;
                //回调
          (*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
                //...
          }
        ev->ev_pncalls = NULL;
      }
    }
    

    event_process_actice 中每次只处理 base->activequeues 数组中的一个链表,这样保证了优先级大的 event 总是先比优先级小的 event 被调用,举个例子:将 1s timeout 优先级为 0 的 ev1 与 1s timeout 优先级为 1 的 ev2 同时 event_add 到 libevent 中,假如他们的回调 callback 被调用之后都会再次将自己注册到 libevent 中(如 timeout 例子中的 timeout_cb 在回调中注册 event),那么不论过了多长时间 ev2 的回调也不会被执行,即使 ev2 已经超时被插入到 activequeues 中。因为他们的超时时间都为 1s,超时之后 ev1 被插在 base-》activequeues[ 0 ],ev2 插在 base->activequeues[ 1 ]中,只有 base->activequeues[ 0 ]为 NULL,base->activequeues[ 1 ]的 event 才会被回调。如果该部分不太清楚的话可以看源代码中 test 目录下 regress.c 测试代码中 test_priorities 函数,里面主要功能就是对优先级队列的测试与验证。

    4.2 I/O 事件-监控文件描述符(socket、file etc)

     

    4.2.1 I/O event 的注册

    libevent 支持网络 IO,检测某个文件描述符是否有事件触发然后回调用户提供的 callback,基于对网络 I/O 事件的监测与分发,libevent 提供了 DNS,HTTP Server,RPC 等组件(libevent2 中将这些组件独立出来)。对网络 IO 事件的分发与 timeout 的整个多长基本相同,libevent 管理 event 使用了 3 中链表分别是 EVLIST_INSERTED, EVLIST_ACTIVE, EVLIST_TIMEOUT。对网络 IO event 的管理只用到前两种。对 IO event 的初始化与 timeout 基本相同,在添加 I/O event 的时候,通过 event->ev_events 判断监测的事件是否有 I/O,如果有的话将其添加到 OS 层进行处理(比如使用 select 判读可读可写),然后插入到 base->eventqueue 中,可以看到 base->eventqueue 不管理 timeout event,它主要保存 IO event 与 singnal event。

    if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) && //如果监听有非超时意外 event
        !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { //且未插入到 libevent
      res = evsel->add(evbase, ev);//添加到 OS 的 IO 分发 reactor
      if (res != -1)
        event_queue_insert(base, ev, EVLIST_INSERTED);
    }
    

    evsel->add 执行的操作之前我们没用讲到,现在选取大家比较熟悉的 select I/O 多路复用模式进行分析,从前面的 event_base 结构体定义中可以看到,base->evsel 是一个指向与 os 相关 eventop 结构体,看一下 struct eventop 以及源文件 select.c 下的 selectop 定义:

    struct eventop {
      const char *name;
      void *(*init)(struct event_base *);
      int (*add)(void *, struct event *);
      int (*del)(void *, struct event *);
      int (*dispatch)(struct event_base *, void *, struct timeval *);
      void (*dealloc)(struct event_base *, void *);
      /* set if we need to reinitialize the event base */
      int need_reinit;
    };
    struct eventop {
      const char *name;
      void *(*init)(struct event_base *);
      int (*add)(void *, struct event *);
      int (*del)(void *, struct event *);
      int (*dispatch)(struct event_base *, void *, struct timeval *);
      void (*dealloc)(struct event_base *, void *);
      /* set if we need to reinitialize the event base */
      int need_reinit;
    };
    const struct eventop selectops = {
      "select",
      select_init,
      select_add,
      select_del,
      select_dispatch,
      select_dealloc,
      0
    };
    

    在添加 io event 的时候 evsel->add 函数指针实际指向的就是 select_add,在初始化 libevent 时 event_base_new(void)函数中 base->evbase = base->evsel->init(base)调用的的 init 实际调用的就是 select_init,select_init 返回了一个 void*指针,该 void*指针主要与底层 OS select 相关,上层代码不需要关心,select_init 初始化部分数据结构,然后返回一个 selectop 结构体:

    struct selectop {
      //event_fds 是传给 OS API select 函数的第一个参数,也就是传递给 select 的所用 set 里面
      //最大的一个 socket 值+1,因此每次 select_add 的时候都会对 ev->ev_fd 进行判断
      int event_fds;    /* Highest fd in fd set */
      int event_fdsz;
      fd_set *event_readset_in; 
      fd_set *event_writeset_in;
      fd_set *event_readset_out;
      fd_set *event_writeset_out;
      struct event **event_r_by_fd;
      struct event **event_w_by_fd;
    };
    static void *
    select_init(struct event_base *base)
    {
      struct selectop *sop;
      /* Disable select when this environment variable is set */
      if (evutil_getenv("EVENT_NOSELECT"))
        return (NULL);
      if (!(sop = calloc(1, sizeof(struct selectop))))
        return (NULL);
      select_resize(sop, howmany(32 + 1, NFDBITS)*sizeof(fd_mask));
      evsignal_init(base);//信号捕捉相关
      return (sop);
    }
    

    select_init 返回的 void*保存在了 base->evbase 中,调用 struct eventop 函数指针的时候,除 init 外都会将 base->evbase 传递给 struct eventop 的函数指针。evsel->add 调用的是 select_add,select_add 的代码与注释:

    //arg 是 select_init 返回的 void*
    //ev read、write 或者 singal,也可以使他们中的任意组合比如 ev 同时监测某个 socket 的读/写
    static int
    select_add(void *arg, struct event *ev)
    {
      struct selectop *sop = arg;
    
      if (ev->ev_events & EV_SIGNAL) //信号的捕捉集成在每一个 os i/o 中
        return (evsignal_add(ev)); //直接调用 singnal.c 模块
    
      check_selectop(sop);
      /*
       * Keep track of the highest fd, so that we can calculate the size
       * of the fd_sets for select(2)
       * selectop->event_fds 是传给 API select 的第一参数+1,
       * 所有添加进来的 fds 中值最大
       */
      if (sop->event_fds < ev->ev_fd) {
        int fdsz = sop->event_fdsz;
            //begin{{{ 内存操作相关,浪费时间的话略过
            //一个 fd_mask 有 32 位,内保存 32 个文件描述符
        if (fdsz < sizeof(fd_mask))
          fdsz = sizeof(fd_mask);
    
        //howmany(x,y)将 x 向上取整 y 的倍数
        while (fdsz <
            (howmany(ev->ev_fd + 1, NFDBITS) * sizeof(fd_mask)))//加一个 ev_fd 进来会不会放不下
          fdsz *= 2;
    
        if (fdsz != sop->event_fdsz) {
          if (select_resize(sop, fdsz)) {//将 selectop 中的各个 set 的缓冲区扩充
            check_selectop(sop);
            return (-1);
          }
        }
        //}}}end 内存操作相关
    
            //添加进来的 fds 文件描述符大于当前保存的文件描述符
            //更新 sop->event_fds
        sop->event_fds = ev->ev_fd;
      }
    
      //将不同 ev 的文件描述符添加到 select 的不同 set 中
      //读 event 添加到 readset,写 event 添加到 writeset
      //将文件描述符作为索引,保存到 selectop->event_*_by_fd[ev->ev_fd]=ev 中
      if (ev->ev_events & EV_READ) {
        FD_SET(ev->ev_fd, sop->event_readset_in);
        sop->event_r_by_fd[ev->ev_fd] = ev;
      }
      if (ev->ev_events & EV_WRITE) {
        FD_SET(ev->ev_fd, sop->event_writeset_in);
        sop->event_w_by_fd[ev->ev_fd] = ev;
      }
      check_selectop(sop);
    
      return (0);
    }
    

    其中的内存相关操作部分可以不用看,理解起来也不难,知道howmany的作用就可以,类似STL内存池里面向上取整操作,《深入理解操作系统》前面部分有C语言相关位操作的章节(仅仅看了前面部分)

    Author: liangsijian

    Created: 2016-04-23 周六 13:00

    Emacs 25.0.92.1 (Org mode 8.2.10)

    Validate

  • 相关阅读:
    spring 源码解析一(bean定义)
    IOC 容器中添加组件的方式
    spring 后置处理器
    .NetCore3.1 配置Kestrel端口的几种方式及优先级
    CESIUM空间中AB两点A绕B点的地面法向量旋转任意角度后新的A点坐标(A’)
    Cesium坐标转换:根据两个坐标点(坐标点a、坐标点b)的经纬度,计算a点和b点的角度
    任意一个点A(x,y)围绕任意一个点B(a,b)旋转任意角度后的坐标值
    已知地球上的2点坐标,A和B,求A,B线上 任意点位置。
    centos6下mysql5.7.13制作rpm包
    sql 删除狐立帐户
  • 原文地址:https://www.cnblogs.com/UnGeek/p/5424407.html
Copyright © 2011-2022 走看看