官方示例
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>
#include <event2/event.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#define MAX_LINE 16384
void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);
char
rot13_char(char c)
{
/* We don't want to use isalpha here; setting the locale would change
* which characters are considered alphabetical. */
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}
struct fd_state {
char buffer[MAX_LINE];
size_t buffer_used;
size_t n_written;
size_t write_upto;
struct event *read_event;
struct event *write_event;
};
struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
struct fd_state *state = malloc(sizeof(struct fd_state));
if (!state)
return NULL;
state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
if (!state->read_event) {
free(state);
return NULL;
}
state->write_event =
event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);
if (!state->write_event) {
event_free(state->read_event);
free(state);
return NULL;
}
state->buffer_used = state->n_written = state->write_upto = 0;
assert(state->write_event);
return state;
}
void
free_fd_state(struct fd_state *state)
{
event_free(state->read_event);
event_free(state->write_event);
free(state);
}
void
do_read(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;
char buf[1024];
int i;
ssize_t result;
while (1) {
assert(state->write_event);
result = recv(fd, buf, sizeof(buf), 0);
if (result <= 0)
break;
for (i=0; i < result; ++i) {
if (state->buffer_used < sizeof(state->buffer))
state->buffer[state->buffer_used++] = rot13_char(buf[i]);
if (buf[i] == '
') {
assert(state->write_event);
event_add(state->write_event, NULL);
state->write_upto = state->buffer_used;
}
}
}
if (result == 0) {
free_fd_state(state);
} else if (result < 0) {
if (errno == EAGAIN) // XXXX use evutil macro
return;
perror("recv");
free_fd_state(state);
}
}
void
do_write(evutil_socket_t fd, short events, void *arg)
{
struct fd_state *state = arg;
while (state->n_written < state->write_upto) {
ssize_t result = send(fd, state->buffer + state->n_written,
state->write_upto - state->n_written, 0);
if (result < 0) {
if (errno == EAGAIN) // XXX use evutil macro
return;
free_fd_state(state);
return;
}
assert(result != 0);
state->n_written += result;
}
if (state->n_written == state->buffer_used)
state->n_written = state->write_upto = state->buffer_used = 1;
event_del(state->write_event);
}
void
do_accept(evutil_socket_t listener, short event, void *arg)
{
struct event_base *base = arg;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listener, (struct sockaddr*)&ss, &slen);
if (fd < 0) { // XXXX eagain??
perror("accept");
} else if (fd > FD_SETSIZE) {
close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
} else {
struct fd_state *state;
evutil_make_socket_nonblocking(fd);
state = alloc_fd_state(base, fd);
assert(state); /*XXX err*/
assert(state->write_event);
event_add(state->read_event, NULL);
}
}
void
run(void)
{
evutil_socket_t listener;
struct sockaddr_in sin;
struct event_base *base;
struct event *listener_event;
base = event_base_new();
if (!base)
return; /*XXXerr*/
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(40713);
listener = socket(AF_INET, SOCK_STREAM, 0);
evutil_make_socket_nonblocking(listener);
#ifndef WIN32
{
int one = 1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
}
#endif
if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
perror("bind");
return;
}
if (listen(listener, 16)<0) {
perror("listen");
return;
}
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
/*XXX check it */
event_add(listener_event, NULL);
event_base_dispatch(base);
}
int
main(int c, char **v)
{
setvbuf(stdout, NULL, _IONBF, 0);
run();
return 0;
}
开始第一步
event_base
在libevent中,我们知道,首先需要创建一个event_base,这个是libevent的基础,我们看一下event_base的定义。
struct event_base {
/** Function pointers and other data to describe this event_base's
* backend. */
const struct eventop *evsel;
/** Pointer to backend-specific data. */
void *evbase;
/** List of changes to tell backend about at next dispatch. Only used
* by the O(1) backends. */
struct event_changelist changelist;
/** Function pointers used to describe the backend that this event_base
* uses for signals */
const struct eventop *evsigsel;
/** Data to implement the common signal handler code. */
struct evsig_info sig;
/** Number of virtual events */
int virtual_event_count;
/** Maximum number of virtual events active */
int virtual_event_count_max;
/** Number of total events added to this event_base */
int event_count;
/** Maximum number of total events added to this event_base */
int event_count_max;
/** Number of total events active in this event_base */
int event_count_active;
/** Maximum number of total events active in this event_base */
int event_count_active_max;
/** Set if we should terminate the loop once we're done processing
* events. */
int event_gotterm;
/** Set if we should terminate the loop immediately */
int event_break;
/** Set if we should start a new instance of the loop immediately. */
int event_continue;
/** The currently running priority of events */
int event_running_priority;
/** Set if we're running the event_base_loop function, to prevent
* reentrant invocation. */
int running_loop;
/** Set to the number of deferred_cbs we've made 'active' in the
* loop. This is a hack to prevent starvation; it would be smarter
* to just use event_config_set_max_dispatch_interval's max_callbacks
* feature */
int n_deferreds_queued;
/* Active event management. */
/** An array of nactivequeues queues for active event_callbacks (ones
* that have triggered, and whose callbacks need to be called). Low
* priority numbers are more important, and stall higher ones.
*/
struct evcallback_list *activequeues;
/** The length of the activequeues array */
int nactivequeues;
/** A list of event_callbacks that should become active the next time
* we process events, but not this time. */
struct evcallback_list active_later_queue;
/* common timeout logic */
/** An array of common_timeout_list* for all of the common timeout
* values we know. */
struct common_timeout_list **common_timeout_queues;
/** The number of entries used in common_timeout_queues */
int n_common_timeouts;
/** The total size of common_timeout_queues. */
int n_common_timeouts_allocated;
/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io;
/** Mapping from signal numbers to enabled (added) events. */
struct event_signal_map sigmap;
/** Priority queue of events with timeouts. */
struct min_heap timeheap;
/** Stored timeval: used to avoid calling gettimeofday/clock_gettime
* too often. */
struct timeval tv_cache;
struct evutil_monotonic_timer monotonic_timer;
/** Difference between internal time (maybe from clock_gettime) and
* gettimeofday. */
struct timeval tv_clock_diff;
/** Second in which we last updated tv_clock_diff, in monotonic time. */
time_t last_updated_clock_diff;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
/* threading support */
/** The thread currently running the event_loop for this base */
unsigned long th_owner_id;
/** A lock to prevent conflicting accesses to this event_base */
void *th_base_lock;
/** A condition that gets signalled when we're done processing an
* event with waiters on it. */
void *current_event_cond;
/** Number of threads blocking on current_event_cond. */
int current_event_waiters;
#endif
/** The event whose callback is executing right now */
struct event_callback *current_event;
#ifdef _WIN32
/** IOCP support structure, if IOCP is enabled. */
struct event_iocp_port *iocp;
#endif
/** Flags that this base was configured with */
enum event_base_config_flag flags;
struct timeval max_dispatch_time;
int max_dispatch_callbacks;
int limit_callbacks_after_prio;
/* Notify main thread to wake up break, etc. */
/** True if the base already has a pending notify, and we don't need
* to add any more. */
int is_notify_pending;
/** A socketpair used by some th_notify functions to wake up the main
* thread. */
evutil_socket_t th_notify_fd[2];
/** An event used by some th_notify functions to wake up the main
* thread. */
struct event th_notify;
/** A function used to wake up the main thread from another thread. */
int (*th_notify_fn)(struct event_base *base);
/** Saved seed for weak random number generator. Some backends use
* this to produce fairness among sockets. Protected by th_base_lock. */
struct evutil_weakrand_state weakrand_seed;
/** List of event_onces that have not yet fired. */
LIST_HEAD(once_event_list, event_once) once_events;
/** "Prepare" and "check" watchers. */
struct evwatch_list watchers[EVWATCH_MAX];
};
event_base结构体中定义了很多内容,通知列表,回调地址等。
event_base_new
event_base通过event_base_new创建,我们看一下event_base_new的源码
/**
* Create and return a new event_base to use with the rest of Libevent.
*
* @return a new event_base on success, or NULL on failure.
*
* @see event_base_free(), event_base_new_with_config()
*/
EVENT2_EXPORT_SYMBOL
struct event_base *event_base_new(void);
struct event_base *
event_base_new(void)
{
struct event_base *base = NULL;
struct event_config *cfg = event_config_new();
if (cfg) {
base = event_base_new_with_config(cfg);
event_config_free(cfg);
}
return base;
}
event_base_new的函数很简单,先是定义一个event_base结构体的指针,然后创建对应的配置信息,配置信息创建完之后,传入event_base_new_with_config,这才是真的创建,我们看一下event_base_new_with_config
event_base_new_with_config
/**
Initialize the event API.
Use event_base_new_with_config() to initialize a new event base, taking
the specified configuration under consideration. The configuration object
can currently be used to avoid certain event notification mechanisms.
@param cfg the event configuration object
@return an initialized event_base that can be used to registering events,
or NULL if no event base can be created with the requested event_config.
@see event_base_new(), event_base_free(), event_init(), event_assign()
*/
EVENT2_EXPORT_SYMBOL
struct event_base *event_base_new_with_config(const struct event_config *cfg);
struct event_base *
event_base_new_with_config(const struct event_config *cfg)
{
int i;
struct event_base *base;
int should_check_environment;
#ifndef EVENT__DISABLE_DEBUG_MODE
event_debug_mode_too_late = 1;
#endif
if ((base = mm_calloc(1, sizeof(struct event_base))) == NULL) {
event_warn("%s: calloc", __func__);
return NULL;
}
if (cfg)
base->flags = cfg->flags;
should_check_environment =
!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
{
struct timeval tmp;
int precise_time =
cfg && (cfg->flags & EVENT_BASE_FLAG_PRECISE_TIMER);
int flags;
if (should_check_environment && !precise_time) {
precise_time = evutil_getenv_("EVENT_PRECISE_TIMER") != NULL;
if (precise_time) {
base->flags |= EVENT_BASE_FLAG_PRECISE_TIMER;
}
}
flags = precise_time ? EV_MONOT_PRECISE : 0;
evutil_configure_monotonic_time_(&base->monotonic_timer, flags);
gettime(base, &tmp);
}
min_heap_ctor_(&base->timeheap);
base->sig.ev_signal_pair[0] = -1;
base->sig.ev_signal_pair[1] = -1;
base->th_notify_fd[0] = -1;
base->th_notify_fd[1] = -1;
TAILQ_INIT(&base->active_later_queue);
evmap_io_initmap_(&base->io);
evmap_signal_initmap_(&base->sigmap);
event_changelist_init_(&base->changelist);
base->evbase = NULL;
if (cfg) {
memcpy(&base->max_dispatch_time,
&cfg->max_dispatch_interval, sizeof(struct timeval));
base->limit_callbacks_after_prio =
cfg->limit_callbacks_after_prio;
} else {
base->max_dispatch_time.tv_sec = -1;
base->limit_callbacks_after_prio = 1;
}
if (cfg && cfg->max_dispatch_callbacks >= 0) {
base->max_dispatch_callbacks = cfg->max_dispatch_callbacks;
} else {
base->max_dispatch_callbacks = INT_MAX;
}
if (base->max_dispatch_callbacks == INT_MAX &&
base->max_dispatch_time.tv_sec == -1)
base->limit_callbacks_after_prio = INT_MAX;
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (event_config_is_avoided_method(cfg,
eventops[i]->name))
continue;
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features)
continue;
}
/* also obey the environment variables */
if (should_check_environment &&
event_is_method_disabled(eventops[i]->name))
continue;
base->evsel = eventops[i];
base->evbase = base->evsel->init(base);
}
if (base->evbase == NULL) {
event_warnx("%s: no event mechanism available",
__func__);
base->evsel = NULL;
event_base_free(base);
return NULL;
}
if (evutil_getenv_("EVENT_SHOW_METHOD"))
event_msgx("libevent using: %s", base->evsel->name);
/* allocate a single active event queue */
if (event_base_priority_init(base, 1) < 0) {
event_base_free(base);
return NULL;
}
/* prepare for threading */
#if !defined(EVENT__DISABLE_THREAD_SUPPORT) && !defined(EVENT__DISABLE_DEBUG_MODE)
event_debug_created_threadable_ctx_ = 1;
#endif
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (EVTHREAD_LOCKING_ENABLED() &&
(!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
int r;
EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0);
EVTHREAD_ALLOC_COND(base->current_event_cond);
r = evthread_make_base_notifiable(base);
if (r<0) {
event_warnx("%s: Unable to make base notifiable.", __func__);
event_base_free(base);
return NULL;
}
}
#endif
#ifdef _WIN32
if (cfg && (cfg->flags & EVENT_BASE_FLAG_STARTUP_IOCP))
event_base_start_iocp_(base, cfg->n_cpus_hint);
#endif
/* initialize watcher lists */
for (i = 0; i < EVWATCH_MAX; ++i)
TAILQ_INIT(&base->watchers[i]);
return (base);
}
event_base_new_with_config的逻辑也比较简单,就是创建一个event_base的结构体,然后一步步的初始化。
绑定监听socket
struct event
event_base创建完之后呢,就是常规流程,创建一个监听socket,然后创建一个event与这个监听socket绑定,我们看一下event的结构体
struct event {
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
size_t min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
short ev_events;
short ev_res; /* result passed to event callback */
struct event_base *ev_base;
union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;
struct timeval ev_timeout;
};
event的内容就少了很多,因为它只需要与对应的事件相关,不像event_base一样,是整个libevent的核心。event中保存了回调函数、对应的socket、对应的事件、对应的event_base和超时时间。
event_new
这个event在创建的时候,就需要填写对应的socket进行绑定,创建函数是event_new
/**
Allocate and assign a new event structure, ready to be added.
The function event_new() returns a new event that can be used in
future calls to event_add() and event_del(). The fd and events
arguments determine which conditions will trigger the event; the
callback and callback_arg arguments tell Libevent what to do when the
event becomes active.
If events contains one of EV_READ, EV_WRITE, or EV_READ|EV_WRITE, then
fd is a file descriptor or socket that should get monitored for
readiness to read, readiness to write, or readiness for either operation
(respectively). If events contains EV_SIGNAL, then fd is a signal
number to wait for. If events contains none of those flags, then the
event can be triggered only by a timeout or by manual activation with
event_active(): In this case, fd must be -1.
The EV_PERSIST flag can also be passed in the events argument: it makes
event_add() persistent until event_del() is called.
The EV_ET flag is compatible with EV_READ and EV_WRITE, and supported
only by certain backends. It tells Libevent to use edge-triggered
events.
The EV_TIMEOUT flag has no effect here.
It is okay to have multiple events all listening on the same fds; but
they must either all be edge-triggered, or all not be edge triggered.
When the event becomes active, the event loop will run the provided
callback function, with three arguments. The first will be the provided
fd value. The second will be a bitfield of the events that triggered:
EV_READ, EV_WRITE, or EV_SIGNAL. Here the EV_TIMEOUT flag indicates
that a timeout occurred, and EV_ET indicates that an edge-triggered
event occurred. The third event will be the callback_arg pointer that
you provide.
@param base the event base to which the event should be attached.
@param fd the file descriptor or signal to be monitored, or -1.
@param events desired events to monitor: bitfield of EV_READ, EV_WRITE,
EV_SIGNAL, EV_PERSIST, EV_ET.
@param callback callback function to be invoked when the event occurs
@param callback_arg an argument to be passed to the callback function
@return a newly allocated struct event that must later be freed with
event_free() or NULL if an error occurred.
@see event_free(), event_add(), event_del(), event_assign()
*/
EVENT2_EXPORT_SYMBOL
struct event *event_new(struct event_base *base, evutil_socket_t fd, short events, event_callback_fn callback, void *callback_arg);
struct event *
event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
{
struct event *ev;
ev = mm_malloc(sizeof(struct event));
if (ev == NULL)
return (NULL);
if (event_assign(ev, base, fd, events, cb, arg) < 0) {
mm_free(ev);
return (NULL);
}
return (ev);
}
event_new的函数也很简单,就是创建一个event结构体,然后调用event_assign。event_new传入的参数,base是把这个event绑定到哪个event_base上;fd就是对应的需要绑定的文件描述符,libevent只是把我们需要操作的事情做了封装,增加了功能和跨平台,所以底层收发消息或是读写文件,还是使用系统提供的接口,这里的fd就是系统提供的用来操作文件或是socket的文件描述符,这个fd上面发生的读写或是错误,libevent会先处理,然后再把处理或是整理的结果给我们,屏蔽了处理的细节;events表示需要关心哪些事件;callback就是事件发生后,如何通知我们,这个就是我们注册的用来通知我们的回调函数;arg就是这个回调函数回调的时候,我们通过创建的时候传入的参数,因为有些自定义的数据需要传递,在socket数据到达或是发送完成后,我们需要处理,如果这里不传递参数,我们在回调的时候就不知道是哪个socket了。
event_assign
event创建出来后,通过event_assign初始化,我们看看event_assign中做了什么
/**
Prepare a new, already-allocated event structure to be added.
The function event_assign() prepares the event structure ev to be used
in future calls to event_add() and event_del(). Unlike event_new(), it
doesn't allocate memory itself: it requires that you have already
allocated a struct event, probably on the heap. Doing this will
typically make your code depend on the size of the event structure, and
thereby create incompatibility with future versions of Libevent.
The easiest way to avoid this problem is just to use event_new() and
event_free() instead.
A slightly harder way to future-proof your code is to use
event_get_struct_event_size() to determine the required size of an event
at runtime.
Note that it is NOT safe to call this function on an event that is
active or pending. Doing so WILL corrupt internal data structures in
Libevent, and lead to strange, hard-to-diagnose bugs. You _can_ use
event_assign to change an existing event, but only if it is not active
or pending!
The arguments for this function, and the behavior of the events that it
makes, are as for event_new().
@param ev an event struct to be modified
@param base the event base to which ev should be attached.
@param fd the file descriptor to be monitored
@param events desired events to monitor; can be EV_READ and/or EV_WRITE
@param callback callback function to be invoked when the event occurs
@param callback_arg an argument to be passed to the callback function
@return 0 if success, or -1 on invalid arguments.
@see event_new(), event_add(), event_del(), event_base_once(),
event_get_struct_event_size()
*/
EVENT2_EXPORT_SYMBOL
int event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, event_callback_fn callback, void *callback_arg);
int
event_assign(struct event *ev, struct event_base *base, evutil_socket_t fd, short events, void (*callback)(evutil_socket_t, short, void *), void *arg)
{
if (!base)
base = current_base;
if (arg == &event_self_cbarg_ptr_)
arg = ev;
if (!(events & EV_SIGNAL))
event_debug_assert_socket_nonblocking_(fd);
event_debug_assert_not_added_(ev);
ev->ev_base = base;
ev->ev_callback = callback;
ev->ev_arg = arg;
ev->ev_fd = fd;
ev->ev_events = events;
ev->ev_res = 0;
ev->ev_flags = EVLIST_INIT;
ev->ev_ncalls = 0;
ev->ev_pncalls = NULL;
if (events & EV_SIGNAL) {
if ((events & (EV_READ|EV_WRITE|EV_CLOSED)) != 0) {
event_warnx("%s: EV_SIGNAL is not compatible with "
"EV_READ, EV_WRITE or EV_CLOSED", __func__);
return -1;
}
ev->ev_closure = EV_CLOSURE_EVENT_SIGNAL;
} else {
if (events & EV_PERSIST) {
evutil_timerclear(&ev->ev_io_timeout);
ev->ev_closure = EV_CLOSURE_EVENT_PERSIST;
} else {
ev->ev_closure = EV_CLOSURE_EVENT;
}
}
min_heap_elem_init_(ev);
if (base != NULL) {
/* by default, we put new events into the middle priority */
ev->ev_pri = base->nactivequeues / 2;
}
event_debug_note_setup_(ev);
return 0;
}
这个函数中说明了,不能在event已经活动的时候使用,因为是初始化,可能会把当前的数据删除或是修改,导致未知的错误。event_assign中的做法也比较简单,就是与event_base创建的一样,初始化信息,把一些需要的内容保存到结构体中。
添加事件到主循环
一切都准备好之后,就需要把我们创建的event添加到主循环开始监听,添加的接口是event_add
event_add
/**
Add an event to the set of pending events.
The function event_add() schedules the execution of the event 'ev' when the
condition specified by event_assign() or event_new() occurs, or when the time
specified in timeout has elapsed. If a timeout is NULL, no timeout
occurs and the function will only be
called if a matching event occurs. The event in the
ev argument must be already initialized by event_assign() or event_new()
and may not be used
in calls to event_assign() until it is no longer pending.
If the event in the ev argument already has a scheduled timeout, calling
event_add() replaces the old timeout with the new one if tv is non-NULL.
@param ev an event struct initialized via event_assign() or event_new()
@param timeout the maximum amount of time to wait for the event, or NULL
to wait forever
@return 0 if successful, or -1 if an error occurred
@see event_del(), event_assign(), event_new()
*/
EVENT2_EXPORT_SYMBOL
int event_add(struct event *ev, const struct timeval *timeout);
int
event_add(struct event *ev, const struct timeval *tv)
{
int res;
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.", __func__);
return -1;
}
EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
res = event_add_nolock_(ev, tv, 0);
EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
return (res);
}
event_add中,先判断一下对应的event_base是否合法,然后加锁(如果有的话),再调用event_add_nolock_执行添加。event_add声明的地方介绍了,不能添加使用的event,如果tv不是NULL,当超时触发后,需要重新添加。event_add_nolock_的源码如下
event_add_nolock_
/* Implementation function to add an event. Works just like event_add,
* except: 1) it requires that we have the lock. 2) if tv_is_absolute is set,
* we treat tv as an absolute time, not as an interval to add to the current
* time */
int
event_add_nolock_(struct event *ev, const struct timeval *tv,
int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
EVENT_BASE_ASSERT_LOCKED(base);
event_debug_assert_is_setup_(ev);
event_debug((
"event_add: event: %p (fd "EV_SOCK_FMT"), %s%s%s%scall %p",
ev,
EV_SOCK_ARG(ev->ev_fd),
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
ev->ev_events & EV_CLOSED ? "EV_CLOSED " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));
EVUTIL_ASSERT(!(ev->ev_flags & ~EVLIST_ALL));
if (ev->ev_flags & EVLIST_FINALIZING) {
/* XXXX debug */
return (-1);
}
/*
* prepare for timeout insertion further below, if we get a
* failure on any step, we should not change any state.
*/
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve_(&base->timeheap,
1 + min_heap_size_(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
/* If the main thread is currently executing a signal event's
* callback, and we are not the main thread, then we want to wait
* until the callback is done before we mess with the event, or else
* we can race on ev_ncalls and ev_pncalls below. */
#ifndef EVENT__DISABLE_THREAD_SUPPORT
if (base->current_event == event_to_event_callback(ev) &&
(ev->ev_events & EV_SIGNAL)
&& !EVBASE_IN_THREAD(base)) {
++base->current_event_waiters;
EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
}
#endif
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) {
if (ev->ev_events & (EV_READ|EV_WRITE|EV_CLOSED))
res = evmap_io_add_(base, ev->ev_fd, ev);
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add_(base, (int)ev->ev_fd, ev);
if (res != -1)
event_queue_insert_inserted(base, ev);
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}
/*
* we should change the timeout state only if the previous event
* addition succeeded.
*/
if (res != -1 && tv != NULL) {
struct timeval now;
int common_timeout;
#ifdef USE_REINSERT_TIMEOUT
int was_common;
int old_timeout_idx;
#endif
/*
* for persistent timeout events, we remember the
* timeout value and re-add the event.
*
* If tv_is_absolute, this was already set.
*/
if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
#ifndef USE_REINSERT_TIMEOUT
if (ev->ev_flags & EVLIST_TIMEOUT) {
event_queue_remove_timeout(base, ev);
}
#endif
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
/* See if we are just active executing
* this event in a loop
*/
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
}
event_queue_remove_active(base, event_to_event_callback(ev));
}
gettime(base, &now);
common_timeout = is_common_timeout(tv, base);
#ifdef USE_REINSERT_TIMEOUT
was_common = is_common_timeout(&ev->ev_timeout, base);
old_timeout_idx = COMMON_TIMEOUT_IDX(&ev->ev_timeout);
#endif
if (tv_is_absolute) {
ev->ev_timeout = *tv;
} else if (common_timeout) {
struct timeval tmp = *tv;
tmp.tv_usec &= MICROSECONDS_MASK;
evutil_timeradd(&now, &tmp, &ev->ev_timeout);
ev->ev_timeout.tv_usec |=
(tv->tv_usec & ~MICROSECONDS_MASK);
} else {
evutil_timeradd(&now, tv, &ev->ev_timeout);
}
event_debug((
"event_add: event %p, timeout in %d seconds %d useconds, call %p",
ev, (int)tv->tv_sec, (int)tv->tv_usec, ev->ev_callback));
#ifdef USE_REINSERT_TIMEOUT
event_queue_reinsert_timeout(base, ev, was_common, common_timeout, old_timeout_idx);
#else
event_queue_insert_timeout(base, ev);
#endif
if (common_timeout) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
if (ev == TAILQ_FIRST(&ctl->events)) {
common_timeout_schedule(ctl, &now, ev);
}
} else {
struct event* top = NULL;
/* See if the earliest timeout is now earlier than it
* was before: if so, we will need to tell the main
* thread to wake up earlier than it would otherwise.
* We double check the timeout of the top element to
* handle time distortions due to system suspension.
*/
if (min_heap_elt_is_top_(ev))
notify = 1;
else if ((top = min_heap_top_(&base->timeheap)) != NULL &&
evutil_timercmp(&top->ev_timeout, &now, <))
notify = 1;
}
}
/* if we are not in the right thread, we need to wake up the loop */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
event_debug_note_add_(ev);
return (res);
}
这个函数就是做了一些处理和判断,比较多的逻辑是用来处理超时的事件,避免加入的时候已经超时,导致永远无法被唤醒等,真正添加的函数是这三个: evmap_io_add_,evmap_signal_add_,event_queue_insert_inserted(base, ev),我们主要看一下evmap_io_add_
evmap_io_add_
/** Add an IO event (some combination of EV_READ or EV_WRITE) to an
event_base's list of events on a given file descriptor, and tell the
underlying eventops about the fd if its state has changed.
Requires that ev is not already added.
@param base the event_base to operate on.
@param fd the file descriptor corresponding to ev.
@param ev the event to add.
*/
int evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev);
/* return -1 on error, 0 on success if nothing changed in the event backend,
* and 1 on success if something did. */
int
evmap_io_add_(struct event_base *base, evutil_socket_t fd, struct event *ev)
{
const struct eventop *evsel = base->evsel;
struct event_io_map *io = &base->io;
struct evmap_io *ctx = NULL;
int nread, nwrite, nclose, retval = 0;
short res = 0, old = 0;
struct event *old_ev;
EVUTIL_ASSERT(fd == ev->ev_fd);
if (fd < 0)
return 0;
#ifndef EVMAP_USE_HT
if (fd >= io->nentries) {
if (evmap_make_space(io, fd, sizeof(struct evmap_io *)) == -1)
return (-1);
}
#endif
GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
evsel->fdinfo_len);
nread = ctx->nread;
nwrite = ctx->nwrite;
nclose = ctx->nclose;
if (nread)
old |= EV_READ;
if (nwrite)
old |= EV_WRITE;
if (nclose)
old |= EV_CLOSED;
if (ev->ev_events & EV_READ) {
if (++nread == 1)
res |= EV_READ;
}
if (ev->ev_events & EV_WRITE) {
if (++nwrite == 1)
res |= EV_WRITE;
}
if (ev->ev_events & EV_CLOSED) {
if (++nclose == 1)
res |= EV_CLOSED;
}
if (EVUTIL_UNLIKELY(nread > 0xffff || nwrite > 0xffff || nclose > 0xffff)) {
event_warnx("Too many events reading or writing on fd %d",
(int)fd);
return -1;
}
if (EVENT_DEBUG_MODE_IS_ON() &&
(old_ev = LIST_FIRST(&ctx->events)) &&
(old_ev->ev_events&EV_ET) != (ev->ev_events&EV_ET)) {
event_warnx("Tried to mix edge-triggered and non-edge-triggered"
" events on fd %d", (int)fd);
return -1;
}
if (res) {
void *extra = ((char*)ctx) + sizeof(struct evmap_io);
/* XXX(niels): we cannot mix edge-triggered and
* level-triggered, we should probably assert on
* this. */
if (evsel->add(base, ev->ev_fd,
old, (ev->ev_events & EV_ET) | res, extra) == -1)
return (-1);
retval = 1;
}
ctx->nread = (ev_uint16_t) nread;
ctx->nwrite = (ev_uint16_t) nwrite;
ctx->nclose = (ev_uint16_t) nclose;
LIST_INSERT_HEAD(&ctx->events, ev, ev_io_next);
return (retval);
}
这个函数的声明的地方已经介绍了,把一个fd添加到loop中,当有消息触发时,通知。这里面比较重要的就是
if (evsel->add(base, ev->ev_fd,
old, (ev->ev_events & EV_ET) | res, extra) == -1)
return (-1);
和
LIST_INSERT_HEAD(&ctx->events, ev, ev_io_next);
这里的evsel就是event_base中的第一个成员
/** Function pointers and other data to describe this event_base's
* backend. */
const struct eventop *evsel;
这是个结构体指针,结构体中包含了一些函数指针和描述event_base底层信息的数据。那么这个eventop是什么呢?看下面的源码:
eventop
/** Structure to define the backend of a given event_base. */
struct eventop {
/** The name of this backend. */
const char *name;
/** Function to set up an event_base to use this backend. It should
* create a new structure holding whatever information is needed to
* run the backend, and return it. The returned pointer will get
* stored by event_init into the event_base.evbase field. On failure,
* this function should return NULL. */
void *(*init)(struct event_base *);
/** Enable reading/writing on a given fd or signal. 'events' will be
* the events that we're trying to enable: one or more of EV_READ,
* EV_WRITE, EV_SIGNAL, and EV_ET. 'old' will be those events that
* were enabled on this fd previously. 'fdinfo' will be a structure
* associated with the fd by the evmap; its size is defined by the
* fdinfo field below. It will be set to 0 the first time the fd is
* added. The function should return 0 on success and -1 on error.
*/
int (*add)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** As "add", except 'events' contains the events we mean to disable. */
int (*del)(struct event_base *, evutil_socket_t fd, short old, short events, void *fdinfo);
/** Function to implement the core of an event loop. It must see which
added events are ready, and cause event_active to be called for each
active event (usually via event_io_active or such). It should
return 0 on success and -1 on error.
*/
int (*dispatch)(struct event_base *, struct timeval *);
/** Function to clean up and free our data from the event_base. */
void (*dealloc)(struct event_base *);
/** Flag: set if we need to reinitialize the event base after we fork.
*/
int need_reinit;
/** Bit-array of supported event_method_features that this backend can
* provide. */
enum event_method_feature features;
/** Length of the extra information we should record for each fd that
has one or more active events. This information is recorded
as part of the evmap entry for each fd, and passed as an argument
to the add and del functions above.
*/
size_t fdinfo_len;
};
里面包含模型的名字,初始化函数指针,增加事件函数指针,删除事件函数指针,分发事件函数指针(这个是event loop的核心),释放资源函数指针,是否重新初始化标志,支持的方法和扩展信息的长度。这个成员是在哪里赋值的呢?我们可以看到上面的event_base_new_with_config函数中,有一个循环用来判断赋值
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (event_config_is_avoided_method(cfg,
eventops[i]->name))
continue;
if ((eventops[i]->features & cfg->require_features)
!= cfg->require_features)
continue;
}
/* also obey the environment variables */
if (should_check_environment &&
event_is_method_disabled(eventops[i]->name))
continue;
base->evsel = eventops[i];
base->evbase = base->evsel->init(base);
}
那么eventops这个又是什么呢?
eventops
/* Array of backends in order of preference. */
static const struct eventop *eventops[] = {
#ifdef EVENT__HAVE_EVENT_PORTS
&evportops,
#endif
#ifdef EVENT__HAVE_WORKING_KQUEUE
&kqops,
#endif
#ifdef EVENT__HAVE_EPOLL
&epollops,
#endif
#ifdef EVENT__HAVE_DEVPOLL
&devpollops,
#endif
#ifdef EVENT__HAVE_POLL
&pollops,
#endif
#ifdef EVENT__HAVE_SELECT
&selectops,
#endif
#ifdef _WIN32
&win32ops,
#endif
#ifdef EVENT__HAVE_WEPOLL
&wepollops,
#endif
NULL
};
这里可以看到,这是一个常量数组,里面定义了每个平台支持的IO模型,这些模型通过宏定义区分
我们这次分析libevent源码是为了学习epoll的,所以我们看一下epollops
struct eventop epollops
#ifdef EVENT__HAVE_EPOLL
extern const struct eventop epollops;
#endif
#ifdef EVENT__HAVE_WEPOLL
const struct eventop wepollops = {
"wepoll",
epoll_init,
epoll_nochangelist_add,
epoll_nochangelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_O1|EV_FEATURE_EARLY_CLOSE,
0
};
#else
const struct eventop epollops = {
"epoll",
epoll_init,
epoll_nochangelist_add,
epoll_nochangelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1|EV_FEATURE_EARLY_CLOSE,
0
};
#endif
定义是在epoll.c中,可以看到eventop的每个成员都有对应的赋值。名称是epoll,初始化函数是epoll_init,添加事件是epoll_nochangelist_add,删除事件是epoll_nochangelist_del,分发消息是epoll_dispatch,释放资源是epoll_dealloc,需要重新初始化,ET模式等。
epoll_init
static void *
epoll_init(struct event_base *base)
{
epoll_handle epfd = INVALID_EPOLL_HANDLE;
struct epollop *epollop;
#ifdef EVENT__HAVE_EPOLL_CREATE1
/* First, try the shiny new epoll_create1 interface, if we have it. */
epfd = epoll_create1(EPOLL_CLOEXEC);
#endif
if (epfd == INVALID_EPOLL_HANDLE) {
/* Initialize the kernel queue using the old interface. (The
size field is ignored since 2.6.8.) */
if ((epfd = epoll_create(32000)) == INVALID_EPOLL_HANDLE) {
if (errno != ENOSYS)
event_warn("epoll_create");
return (NULL);
}
#ifndef EVENT__HAVE_WEPOLL
evutil_make_socket_closeonexec(epfd);
#endif
}
if (!(epollop = mm_calloc(1, sizeof(struct epollop)))) {
close_epoll_handle(epfd);
return (NULL);
}
epollop->epfd = epfd;
/* Initialize fields */
epollop->events = mm_calloc(INITIAL_NEVENT, sizeof(struct epoll_event));
if (epollop->events == NULL) {
mm_free(epollop);
close_epoll_handle(epfd);
return (NULL);
}
epollop->nevents = INITIAL_NEVENT;
#ifndef EVENT__HAVE_WEPOLL
if ((base->flags & EVENT_BASE_FLAG_EPOLL_USE_CHANGELIST) != 0 ||
((base->flags & EVENT_BASE_FLAG_IGNORE_ENV) == 0 &&
evutil_getenv_("EVENT_EPOLL_USE_CHANGELIST") != NULL)) {
base->evsel = &epollops_changelist;
}
#endif
#ifdef USING_TIMERFD
/*
The epoll interface ordinarily gives us one-millisecond precision,
so on Linux it makes perfect sense to use the CLOCK_MONOTONIC_COARSE
timer. But when the user has set the new PRECISE_TIMER flag for an
event_base, we can try to use timerfd to give them finer granularity.
*/
if ((base->flags & EVENT_BASE_FLAG_PRECISE_TIMER) &&
!(base->flags & EVENT_BASE_FLAG_EPOLL_DISALLOW_TIMERFD) &&
base->monotonic_timer.monotonic_clock == CLOCK_MONOTONIC) {
int fd;
fd = epollop->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
if (epollop->timerfd >= 0) {
struct epoll_event epev;
memset(&epev, 0, sizeof(epev));
epev.data.fd = epollop->timerfd;
epev.events = EPOLLIN;
if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, fd, &epev) < 0) {
event_warn("epoll_ctl(timerfd)");
close(fd);
epollop->timerfd = -1;
}
} else {
if (errno != EINVAL && errno != ENOSYS) {
/* These errors probably mean that we were
* compiled with timerfd/TFD_* support, but
* we're running on a kernel that lacks those.
*/
event_warn("timerfd_create");
}
epollop->timerfd = -1;
}
} else {
epollop->timerfd = -1;
}
#endif
evsig_init_(base);
return (epollop);
}
这个函数就是调用系统的api,创建一个epoll文件描述符
epoll_nochangelist_add
创建完事件后,需要把事件添加到主循环中
static int
epoll_nochangelist_add(struct event_base *base, evutil_socket_t fd,
short old, short events, void *p)
{
struct event_change ch;
ch.fd = fd;
ch.old_events = old;
ch.read_change = ch.write_change = ch.close_change = 0;
if (events & EV_WRITE)
ch.write_change = EV_CHANGE_ADD |
(events & EV_ET);
if (events & EV_READ)
ch.read_change = EV_CHANGE_ADD |
(events & EV_ET);
if (events & EV_CLOSED)
ch.close_change = EV_CHANGE_ADD |
(events & EV_ET);
return epoll_apply_one_change(base, base->evbase, &ch);
}
记录下原来有哪些事件,需要更改哪些事件,最后调用epoll_apply_one_change
epoll_apply_one_change
static int
epoll_apply_one_change(struct event_base *base,
struct epollop *epollop,
const struct event_change *ch)
{
struct epoll_event epev;
int op, events = 0;
int idx;
//获取设置对应的数组索引,并赋值对应的设置和事件
idx = EPOLL_OP_TABLE_INDEX(ch);
op = epoll_op_table[idx].op;
events = epoll_op_table[idx].events;
if (!events) {
EVUTIL_ASSERT(op == 0);
return 0;
}
//判断ET模式
if ((ch->read_change|ch->write_change|ch->close_change) & EV_CHANGE_ET)
events |= EPOLLET;
memset(&epev, 0, sizeof(epev));
#ifdef EVENT__HAVE_WEPOLL
epev.data.sock = ch->fd;
#else
epev.data.fd = ch->fd;
#endif
epev.events = events;
//把ch->fd这个socket,添加到epollop->epfd这个epoll文件描述符上,参数是op,传入的参数是epev
if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == 0) {
event_debug((PRINT_CHANGES(op, epev.events, ch, "okay")));
return 0;
}
switch (op) {
case EPOLL_CTL_MOD:
if (errno == ENOENT) {
/* If a MOD operation fails with ENOENT, the
* fd was probably closed and re-opened. We
* should retry the operation as an ADD.
*/
if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) {
event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll MOD(%d) on %d retried as ADD; succeeded.",
(int)epev.events,
ch->fd));
return 0;
}
}
break;
case EPOLL_CTL_ADD:
if (errno == EEXIST) {
/* If an ADD operation fails with EEXIST,
* either the operation was redundant (as with a
* precautionary add), or we ran into a fun
* kernel bug where using dup*() to duplicate the
* same file into the same fd gives you the same epitem
* rather than a fresh one. For the second case,
* we must retry with MOD. */
if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) {
event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
(int)epev.events,
ch->fd));
return 0;
}
}
break;
case EPOLL_CTL_DEL:
if (errno == ENOENT || errno == EBADF || errno == EPERM) {
/* If a delete fails with one of these errors,
* that's fine too: we closed the fd before we
* got around to calling epoll_dispatch. */
event_debug(("Epoll DEL(%d) on fd %d gave %s: DEL was unnecessary.",
(int)epev.events,
ch->fd,
strerror(errno)));
return 0;
}
break;
default:
break;
}
event_warn(PRINT_CHANGES(op, epev.events, ch, "failed"));
return -1;
}
上面的函数就是调用了epoll_ctl对事件进行注册,然后对出错的情况做了处理,这里值得我们借鉴一下。
epoll_nochangelist_del
事件可以注册,也可以删除
static int
epoll_nochangelist_del(struct event_base *base, evutil_socket_t fd,
short old, short events, void *p)
{
struct event_change ch;
ch.fd = fd;
ch.old_events = old;
ch.read_change = ch.write_change = ch.close_change = 0;
if (events & EV_WRITE)
ch.write_change = EV_CHANGE_DEL |
(events & EV_ET);
if (events & EV_READ)
ch.read_change = EV_CHANGE_DEL |
(events & EV_ET);
if (events & EV_CLOSED)
ch.close_change = EV_CHANGE_DEL |
(events & EV_ET);
return epoll_apply_one_change(base, base->evbase, &ch);
}
也是先获得原来的事件,然后增加删除的标志,调用epoll_apply_one_change
epoll_apply_one_change
static int
epoll_apply_one_change(struct event_base *base,
struct epollop *epollop,
const struct event_change *ch)
{
struct epoll_event epev;
int op, events = 0;
int idx;
idx = EPOLL_OP_TABLE_INDEX(ch);
op = epoll_op_table[idx].op;
events = epoll_op_table[idx].events;
if (!events) {
EVUTIL_ASSERT(op == 0);
return 0;
}
if ((ch->read_change|ch->write_change|ch->close_change) & EV_CHANGE_ET)
events |= EPOLLET;
memset(&epev, 0, sizeof(epev));
#ifdef EVENT__HAVE_WEPOLL
epev.data.sock = ch->fd;
#else
epev.data.fd = ch->fd;
#endif
epev.events = events;
//同样是调用epoll_ctl操作删除事件
if (epoll_ctl(epollop->epfd, op, ch->fd, &epev) == 0) {
event_debug((PRINT_CHANGES(op, epev.events, ch, "okay")));
return 0;
}
switch (op) {
case EPOLL_CTL_MOD:
if (errno == ENOENT) {
/* If a MOD operation fails with ENOENT, the
* fd was probably closed and re-opened. We
* should retry the operation as an ADD.
*/
if (epoll_ctl(epollop->epfd, EPOLL_CTL_ADD, ch->fd, &epev) == -1) {
event_warn("Epoll MOD(%d) on %d retried as ADD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll MOD(%d) on %d retried as ADD; succeeded.",
(int)epev.events,
ch->fd));
return 0;
}
}
break;
case EPOLL_CTL_ADD:
if (errno == EEXIST) {
/* If an ADD operation fails with EEXIST,
* either the operation was redundant (as with a
* precautionary add), or we ran into a fun
* kernel bug where using dup*() to duplicate the
* same file into the same fd gives you the same epitem
* rather than a fresh one. For the second case,
* we must retry with MOD. */
if (epoll_ctl(epollop->epfd, EPOLL_CTL_MOD, ch->fd, &epev) == -1) {
event_warn("Epoll ADD(%d) on %d retried as MOD; that failed too",
(int)epev.events, ch->fd);
return -1;
} else {
event_debug(("Epoll ADD(%d) on %d retried as MOD; succeeded.",
(int)epev.events,
ch->fd));
return 0;
}
}
break;
case EPOLL_CTL_DEL:
if (errno == ENOENT || errno == EBADF || errno == EPERM) {
/* If a delete fails with one of these errors,
* that's fine too: we closed the fd before we
* got around to calling epoll_dispatch. */
event_debug(("Epoll DEL(%d) on fd %d gave %s: DEL was unnecessary.",
(int)epev.events,
ch->fd,
strerror(errno)));
return 0;
}
break;
default:
break;
}
event_warn(PRINT_CHANGES(op, epev.events, ch, "failed"));
return -1;
}
这个函数与上面增加差别不大,都是调用epoll_ctl进行操作
epoll_dispatch
增加删除事件的功能都有了,还需要分发使事件循环运行起来
static int
epoll_dispatch(struct event_base *base, struct timeval *tv)
{
struct epollop *epollop = base->evbase;
struct epoll_event *events = epollop->events;
int i, res;
long timeout = -1;
#ifdef USING_TIMERFD
if (epollop->timerfd >= 0) {
struct itimerspec is;
is.it_interval.tv_sec = 0;
is.it_interval.tv_nsec = 0;
if (tv == NULL) {
/* No timeout; disarm the timer. */
is.it_value.tv_sec = 0;
is.it_value.tv_nsec = 0;
} else {
if (tv->tv_sec == 0 && tv->tv_usec == 0) {
/* we need to exit immediately; timerfd can't
* do that. */
timeout = 0;
}
is.it_value.tv_sec = tv->tv_sec;
is.it_value.tv_nsec = tv->tv_usec * 1000;
}
/* TODO: we could avoid unnecessary syscalls here by only
calling timerfd_settime when the top timeout changes, or
when we're called with a different timeval.
*/
if (timerfd_settime(epollop->timerfd, 0, &is, NULL) < 0) {
event_warn("timerfd_settime");
}
} else
#endif
if (tv != NULL) {
timeout = evutil_tv_to_msec_(tv);
if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) {
/* Linux kernels can wait forever if the timeout is
* too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}
}
epoll_apply_changes(base);
event_changelist_remove_all_(&base->changelist, base);
EVBASE_RELEASE_LOCK(base, th_base_lock);
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
return (0);
}
event_debug(("%s: epoll_wait reports %d", __func__, res));
EVUTIL_ASSERT(res <= epollop->nevents);
for (i = 0; i < res; i++) {
int what = events[i].events;
short ev = 0;
#ifdef USING_TIMERFD
if (events[i].data.fd == epollop->timerfd)
continue;
#endif
if (what & EPOLLERR) {
ev = EV_READ | EV_WRITE;
} else if ((what & EPOLLHUP) && !(what & EPOLLRDHUP)) {
ev = EV_READ | EV_WRITE;
} else {
if (what & EPOLLIN)
ev |= EV_READ;
if (what & EPOLLOUT)
ev |= EV_WRITE;
if (what & EPOLLRDHUP)
ev |= EV_CLOSED;
}
if (!ev)
continue;
#ifdef EVENT__HAVE_WEPOLL
evmap_io_active_(base, events[i].data.sock, ev);
#else
evmap_io_active_(base, events[i].data.fd, ev | EV_ET);
#endif
}
if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) {
/* We used all of the event space this time. We should
be ready for more events next time. */
int new_nevents = epollop->nevents * 2;
struct epoll_event *new_events;
new_events = mm_realloc(epollop->events,
new_nevents * sizeof(struct epoll_event));
if (new_events) {
epollop->events = new_events;
epollop->nevents = new_nevents;
}
}
return (0);
}
这里面处理了一些超时的事件,我们不关心其他的,只看epoll,实际上就是调用了epoll_wait,这个就是epoll等待事件触发的api,libevent获得事件后,会按照封装的逻辑,添加对应的event
epoll_dealloc
static void
epoll_dealloc(struct event_base *base)
{
struct epollop *epollop = base->evbase;
evsig_dealloc_(base);
if (epollop->events)
mm_free(epollop->events);
if (epollop->epfd != INVALID_EPOLL_HANDLE)
close_epoll_handle(epollop->epfd);
#ifdef USING_TIMERFD
if (epollop->timerfd >= 0)
close(epollop->timerfd);
#endif
memset(epollop, 0, sizeof(struct epollop));
mm_free(epollop);
}
删除的流程比较简单,删除之前,处理一些事情,然后释放掉内存。在处理删除事件的时候,比较复杂,需要考虑各种事情,以免内存泄露或是导致野指针
event_base_dispatch
事件处理完成后,调用loop主循环,开始监听链接或是消息的到来
/**
Event dispatching loop
This loop will run the event base until either there are no more pending or
active, or until something calls event_base_loopbreak() or
event_base_loopexit().
@param base the event_base structure returned by event_base_new() or
event_base_new_with_config()
@return 0 if successful, -1 if an error occurred, or 1 if we exited because
no events were pending or active.
@see event_base_loop()
*/
EVENT2_EXPORT_SYMBOL
int event_base_dispatch(struct event_base *base);
int
event_base_dispatch(struct event_base *event_base)
{
return (event_base_loop(event_base, 0));
}
这个函数很简单,应该只是暴露给上层的一个结构,声明的地方说明了,开启一个event base的循环,event base从event_base_new或是event_base_new_with_config获得;这两个函数我们上面介绍了,创建event_base使用的是event_base_new,event_base_new只是初始化了一个配置信息,最终是调用event_base_new_with_config。
event_base_loop
/**
Wait for events to become active, and run their callbacks.
This is a more flexible version of event_base_dispatch().
By default, this loop will run the event base until either there are no more
pending or active events, or until something calls event_base_loopbreak() or
event_base_loopexit(). You can override this behavior with the 'flags'
argument.
@param eb the event_base structure returned by event_base_new() or
event_base_new_with_config()
@param flags any combination of EVLOOP_ONCE | EVLOOP_NONBLOCK
@return 0 if successful, -1 if an error occurred, or 1 if we exited because
no events were pending or active.
@see event_base_loopexit(), event_base_dispatch(), EVLOOP_ONCE,
EVLOOP_NONBLOCK
*/
EVENT2_EXPORT_SYMBOL
int event_base_loop(struct event_base *eb, int flags);
int
event_base_loop(struct event_base *base, int flags)
{
//获取evsel,当前系统底层最终操作文件描述符的函数,这是一层封装
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
struct evwatch_prepare_cb_info prepare_info;
struct evwatch_check_cb_info check_info;
struct evwatch *watcher;
//尝试获取锁,会在evsel.dispatch中释放掉,在调用回调函数的时候再加锁
/* Grab the lock. We will release it inside evsel.dispatch, and again
* as we invoke watchers and user callbacks. */
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
//如果是已经在循环中,就退出当前循环,因为同时只能有一个主循环
if (base->running_loop) {
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.", __func__);
EVBASE_RELEASE_LOCK(base, th_base_lock);
return -1;
}
base->running_loop = 1;
clear_time_cache(base);
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)
evsig_set_base_(base);
done = 0;
#ifndef EVENT__DISABLE_THREAD_SUPPORT
base->th_owner_id = EVTHREAD_GET_ID();
#endif
base->event_gotterm = base->event_break = 0;
while (!done) {
base->event_continue = 0;
base->n_deferreds_queued = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
tv_p = &tv;
//超时处理
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
/*
* if we have active events, we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}
//如果没有事件处理就退出
/* If we have no events, we just exit */
if (0==(flags&EVLOOP_NO_EXIT_ON_EMPTY) &&
!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.", __func__));
retval = 1;
goto done;
}
//激活最后一个事件
event_queue_make_later_events_active(base);
/* Invoke prepare watchers before polling for events */
prepare_info.timeout = tv_p;
TAILQ_FOREACH(watcher, &base->watchers[EVWATCH_PREPARE], next) {
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*watcher->callback.prepare)(watcher, &prepare_info, watcher->arg);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
}
clear_time_cache(base);
//这里就是上面的分发消息,实际上如果是epoll,处理一些超时和标志后,调用的是epoll_wait,等待事件到来
res = evsel->dispatch(base, tv_p);
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",
__func__));
retval = -1;
goto done;
}
update_time_cache(base);
/* Invoke check watchers after polling for events, and before
* processing them */
TAILQ_FOREACH(watcher, &base->watchers[EVWATCH_CHECK], next) {
EVBASE_RELEASE_LOCK(base, th_base_lock);
(*watcher->callback.check)(watcher, &check_info, watcher->arg);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
}
timeout_process(base);
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
event_debug(("%s: asked to terminate loop.", __func__));
done:
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base, th_base_lock);
return (retval);
}
终于看到我们的主体循环了。它的作用在声明的时候写了,等待激活的事件,然后调用他们的回调函数。默认情况下,这个循环会一直执行,直到event base没有激活或是挂起的事件,或是调用了event_base_loopbreak()或event_base_loopexit()。你可以通过flags修改这个行为。