zoukankan      html  css  js  c++  java
  • libevent简单分析

    一看名字就知道是围绕eventloop转的。
    那首先肯定是eventloop是个什么?一般都是IO事件,timer事件的管理器。
    那首先看如何new出来一个eventloop:
    1、因为libevent是跨平台的,在不同平台上会有不同的配置,首先读配置:
    struct event_config {
    TAILQ_HEAD(event_configq, event_config_entry) entries;


    int n_cpus_hint;
    enum event_method_feature require_features;
    enum event_base_config_flag flags;
    };
    有一个event_config 的结构,目的就是为了描述将要创建的event_base的配置。
    #define TAILQ_HEAD(name, type)
    struct name {
    struct type *tqh_first;
    struct type **tqh_last;
    }
    TAILQ_HEAD是一个结构体的宏。
    好吧,为了方便阅读我把宏给干掉了:


    struct event_config_entry {
    struct {
    event_config_entry* tqe_next;
    event_config_entry** tqe_prev;
    } next;
    const char *avoid_method;
    };






    struct event_config {
    struct event_configq{
    struct event_config_entry *tqh_first;
    struct event_config_entry **tqh_last;
    } entries;


    int n_cpus_hint;
    enum event_method_feature require_features;
    enum event_base_config_flag flags;
    };
    现在清楚了event_config_new就是分配了这个结构体(链表)的内存,并对链表做了初始化。
    下一步要开始初始化配置了:
    event_base_new_with_config在做这个工作:
    哇塞,这个函数里就开始new eventBase了:
    而且在对base做初始化
    1,、初始化base的时间:
    gettime(base, &base->event_tv);
    看下实现就清楚了:
    static int gettime(struct event_base *base, struct timeval *tp)
    {
    EVENT_BASE_ASSERT_LOCKED(base);


    if (base->tv_cache.tv_sec) {
    *tp = base->tv_cache;
    return (0);
    }


    return (evutil_gettimeofday(tp, NULL));
    }
    如果base的tv_cache保存的有时间就直接用这个时间,以避免调用函数获取时间太频繁;
    否则调用evutil_gettimeofday 是会调用各个系统获取当前时间的函数。
    之后设置这个几个参数:
    base->sig.ev_signal_pair[0] = -1;
    base->sig.ev_signal_pair[1] = -1;
    base->th_notify_fd[0] = -1;
    base->th_notify_fd[1] = -1;
    sig是与信号相关的,linux下的,windows下的貌似没什么用;重点看th_notify_fd的作用:
    evutil_socket_t th_notify_fd[2];
    意思是用来由一些线程通知函数唤醒主线程的。


    event_deferred_cb_queue_init(&base->defer_queue);
    初始化defer_queue队列,这个队列的作用是:
    这个队列内部结构如下:
    struct deferred_cb_queue {
    void *lock;


    int active_count;


    void (*notify_fn)(struct deferred_cb_queue *, void *);
    void *notify_arg;


    TAILQ_HEAD (deferred_cb_list, deferred_cb) deferred_cb_list;
    };
    里面还有个链表:
    struct deferred_cb {
    TAILQ_ENTRY (deferred_cb) cb_next;
    unsigned queued : 1;
    deferred_cb_fn cb;
    void *arg;
    };
    之后设置根据配置backend;
    在windows下可以设置为select或者iocp;如果是select代码会在win32select.c里:
    struct eventop win32ops = {
    "win32",
    win32_init,
    win32_add,
    win32_del,
    win32_dispatch,
    win32_dealloc,
    0,
    0,
    sizeof(struct idx_info),
    }
    初始化时:
    void *
    win32_init(struct event_base *_base)
    {
    struct win32op *winop;
    size_t size;
    if (!(winop = mm_calloc(1, sizeof(struct win32op))))
    return NULL;
    winop->num_fds_in_fd_sets = NEVENT;
    size = FD_SET_ALLOC_SIZE(NEVENT);
    if (!(winop->readset_in = mm_malloc(size)))
    goto err;
    if (!(winop->writeset_in = mm_malloc(size)))
    goto err;
    if (!(winop->readset_out = mm_malloc(size)))
    goto err;
    if (!(winop->writeset_out = mm_malloc(size)))
    goto err;
    if (!(winop->exset_out = mm_malloc(size)))
    goto err;
    winop->readset_in->fd_count = winop->writeset_in->fd_count = 0;
    winop->readset_out->fd_count = winop->writeset_out->fd_count
    = winop->exset_out->fd_count = 0;


    if (evsig_init(_base) < 0)
    winop->signals_are_broken = 1;


    return (winop);
     err:
    XFREE(winop->readset_in);
    XFREE(winop->writeset_in);
    XFREE(winop->readset_out);
    XFREE(winop->writeset_out);
    XFREE(winop->exset_out);
    XFREE(winop);
    return (NULL);
    }
    里面分配了32个fd;
    好吧Eventbase的事情终于搞完了。好辛苦!!!!
    关键是看event_base_dispatch如何进行mainloop的:
    int
    event_base_loop(struct event_base *base, int flags)
    {
    const struct eventop *evsel = base->evsel;
    struct timeval tv;
    struct timeval *tv_p;
    int res, done, retval = 0;


    EVBASE_ACQUIRE_LOCK(base, th_base_lock);


    if (base->running_loop) {
    event_warnx("%s: reentrant invocation.  Only one event_base_loop"
       " can run on each event_base at once.", __func__);
    EVBASE_RELEASE_LOCK(base, th_base_lock);
    return -1;
    }


    base->running_loop = 1;


    clear_time_cache(base);


    if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
    evsig_set_base(base);


    done = 0;


    #ifndef _EVENT_DISABLE_THREAD_SUPPORT
    base->th_owner_id = EVTHREAD_GET_ID();
    #endif


    base->event_gotterm = base->event_break = 0;


    while (!done) {
    base->event_continue = 0;


    if (base->event_gotterm) {
    break;
    }


    if (base->event_break) {
    break;
    }


    timeout_correct(base, &tv);


    tv_p = &tv;
    if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
    timeout_next(base, &tv_p);
    } else {
    evutil_timerclear(&tv);
    }


    if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
    event_debug(("%s: no events registered.", __func__));
    retval = 1;
    goto done;
    }


    gettime(base, &base->event_tv);


    clear_time_cache(base);


    res = evsel->dispatch(base, tv_p);


    if (res == -1) {
    event_debug(("%s: dispatch returned unsuccessfully.",
    __func__));
    retval = -1;
    goto done;
    }


    update_time_cache(base);


    timeout_process(base);


    if (N_ACTIVE_CALLBACKS(base)) {
    int n = event_process_active(base);
    if ((flags & EVLOOP_ONCE)
       && N_ACTIVE_CALLBACKS(base) == 0
       && n != 0)
    done = 1;
    } else if (flags & EVLOOP_NONBLOCK)
    done = 1;
    }
    event_debug(("%s: asked to terminate loop.", __func__));


    done:
    clear_time_cache(base);
    base->running_loop = 0;


    EVBASE_RELEASE_LOCK(base, th_base_lock);


    return (retval);
    }
    跟之前看过的libev是一个模子出来的,都是先检测有木有可读、可写的马上去处理,然后调用backend->dispatch检测事件,处理timer事件。一直转,除非外部要停止或者木有fd了。
    下面看下select事件的检测吧:
    int
    win32_dispatch(struct event_base *base, struct timeval *tv)
    {
    struct win32op *win32op = base->evbase;
    int res = 0;
    unsigned j, i;
    int fd_count;
    SOCKET s;


    if (win32op->resize_out_sets) {
    size_t size = FD_SET_ALLOC_SIZE(win32op->num_fds_in_fd_sets);
    if (!(win32op->readset_out = mm_realloc(win32op->readset_out, size)))
    return (-1);
    if (!(win32op->exset_out = mm_realloc(win32op->exset_out, size)))
    return (-1);
    if (!(win32op->writeset_out = mm_realloc(win32op->writeset_out, size)))
    return (-1);
    win32op->resize_out_sets = 0;
    }


    fd_set_copy(win32op->readset_out, win32op->readset_in);
    fd_set_copy(win32op->exset_out, win32op->writeset_in);
    fd_set_copy(win32op->writeset_out, win32op->writeset_in);


    fd_count =
       (win32op->readset_out->fd_count > win32op->writeset_out->fd_count) ?
       win32op->readset_out->fd_count : win32op->writeset_out->fd_count;


    if (!fd_count) {
    long msec = tv ? evutil_tv_to_msec(tv) : LONG_MAX;
    if (msec < 0)
    msec = LONG_MAX;
    Sleep(msec);
    return (0);
    }


    EVBASE_RELEASE_LOCK(base, th_base_lock);


    res = select(fd_count,
        (struct fd_set*)win32op->readset_out,
        (struct fd_set*)win32op->writeset_out,
        (struct fd_set*)win32op->exset_out, tv);


    EVBASE_ACQUIRE_LOCK(base, th_base_lock);


    event_debug(("%s: select returned %d", __func__, res));


    if (res <= 0) {
    return res;
    }


    if (win32op->readset_out->fd_count) {
    i = rand() % win32op->readset_out->fd_count;
    for (j=0; jreadset_out->fd_count; ++j) {
    if (++i >= win32op->readset_out->fd_count)
    i = 0;
    s = win32op->readset_out->fd_array[i];
    evmap_io_active(base, s, EV_READ);
    }
    }
    if (win32op->exset_out->fd_count) {
    i = rand() % win32op->exset_out->fd_count;
    for (j=0; jexset_out->fd_count; ++j) {
    if (++i >= win32op->exset_out->fd_count)
    i = 0;
    s = win32op->exset_out->fd_array[i];
    evmap_io_active(base, s, EV_WRITE);
    }
    }
    if (win32op->writeset_out->fd_count) {
    SOCKET s;
    i = rand() % win32op->writeset_out->fd_count;
    for (j=0; jwriteset_out->fd_count; ++j) {
    if (++i >= win32op->writeset_out->fd_count)
    i = 0;
    s = win32op->writeset_out->fd_array[i];
    evmap_io_active(base, s, EV_WRITE);
    }
    }
    return (0);
    }
    考虑的很周到,select之前会释放lock,因为select这里是阻塞的,但不希望阻塞主线程,select会继续加锁。把可读,可写fd进行copy,里面还用了一个简单的随机 保证公平。
    之后就更新timecache为当前时间,然后检测timer时间,到点就执行timer事件:
    static void
    timeout_process(struct event_base *base)
    {
    struct timeval now;
    struct event *ev;


    if (min_heap_empty(&base->timeheap)) {
    return;
    }


    gettime(base, &now);


    while ((ev = min_heap_top(&base->timeheap))) {
    if (evutil_timercmp(&ev->ev_timeout, &now, >))
    break;


    event_del_internal(ev);


    event_debug(("timeout_process: call %p",
    ev->ev_callback));
    event_active_nolock(ev, EV_TIMEOUT, 1);
    }
    }
    一看代码就明白了使用最小堆管理时间的,当然时间间隔最小的要先执行,处理流程是这样的:
    检测时间堆里的事件,如果时间已经超过当前时间,那么先把这个事件从堆里删除,然后再active这个event,就是调用注册进来的timer函数。
    好了,暂时写到这里吧。

  • 相关阅读:
    [译]为什么你要学Go?
    类Lisp解释器JavaScript实现
    前端同学Windows中生存指北
    Virtualbox
    数据发布和上线日报&周报系统开发总结
    [Lab1]五分钟了解Makefile
    解决linux下解压来自windows的zip文件产生乱码的问题
    Arch使用过程中出现的问题汇总
    jQuery为什么移除了.toggle()方法
    Linux配置DNS
  • 原文地址:https://www.cnblogs.com/riasky/p/3464834.html
Copyright © 2011-2022 走看看