上文主要讲了bufferevent如何监听读事件,那么bufferevent如何监听写事件呢?
对于一个fd,只要它的写缓冲区没有满,就会触发写事件。 一般情况下,如果不向这个fd发送大量的数据,它的写缓冲区是不会满的。 所以,如果一开始就监听写事件,写事件会一直被触发。 libevent的做法是: 当我们确实要向fd写入数据时,才监听该fd的写事件。 |
监听写事件
在用户回调函数中,可以通过 bufferevent_write 向输出缓冲output中写数据。
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) { if (evbuffer_add(bufev->output, data, size) == -1) return (-1); return 0; }
/* Adds data to an event buffer */ int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) { struct evbuffer_chain *chain, *tmp; const unsigned char *data = data_in; size_t remain, to_alloc; int result = -1; EVBUFFER_LOCK(buf); if (buf->freeze_end) { goto done; } /* Prevent buf->total_len overflow */ if (datlen > EV_SIZE_MAX - buf->total_len) { goto done; } chain = buf->last; /* If there are no chains allocated for this buffer, allocate one * big enough to hold all the data. */ if (chain == NULL) { chain = evbuffer_chain_new(datlen); if (!chain) goto done; evbuffer_chain_insert(buf, chain); } if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) { /* Always true for mutable buffers */ EVUTIL_ASSERT(chain->misalign >= 0 && (ev_uint64_t)chain->misalign <= EVBUFFER_CHAIN_MAX); remain = chain->buffer_len - (size_t)chain->misalign - chain->off; if (remain >= datlen) { /* there's enough space to hold all the data in the * current last chain */ memcpy(chain->buffer + chain->misalign + chain->off, data, datlen); chain->off += datlen; buf->total_len += datlen; buf->n_add_for_cb += datlen; goto out; } else if (!CHAIN_PINNED(chain) && evbuffer_chain_should_realign(chain, datlen)) { /* we can fit the data into the misalignment */ evbuffer_chain_align(chain); memcpy(chain->buffer + chain->off, data, datlen); chain->off += datlen; buf->total_len += datlen; buf->n_add_for_cb += datlen; goto out; } } else { /* we cannot write any data to the last chain */ remain = 0; } /* we need to add another chain */ to_alloc = chain->buffer_len; if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2) to_alloc <<= 1; if (datlen > to_alloc) to_alloc = datlen; tmp = evbuffer_chain_new(to_alloc); if (tmp == NULL) goto done; if (remain) { memcpy(chain->buffer + chain->misalign + chain->off, data, remain); chain->off += remain; buf->total_len += remain; buf->n_add_for_cb += remain; } data += remain; datlen -= remain; memcpy(tmp->buffer, data, datlen); tmp->off = datlen; evbuffer_chain_insert(buf, tmp); buf->n_add_for_cb += datlen; out: evbuffer_invoke_callbacks(buf); result = 0; done: EVBUFFER_UNLOCK(buf); return result; }
现在回顾一下bufferevent_socket_new,我们在这个函数中,设置了输出缓冲区的回调函数
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
static void bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); if (cbinfo->n_added && (bufev->enabled & EV_WRITE) && !event_pending(&bufev->ev_write, EV_WRITE, NULL) && !bufev_p->write_suspended) { /* Somebody added data to the buffer, and we would like to * write, and we were not writing. So, start writing. */ if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) { /* Should we log this? */ } } }
可以看出,我们在输出缓冲区的回调函数中,将该fd的写事件添加到了epoll中。
事件流程
上面我们监听了fd的写事件,而此时该fd的写缓冲区没有满,所以写事件被触发,继而调用我们在上文设置的写事件回调函数 bufferevent_writecb。
static void bufferevent_writecb(evutil_socket_t fd, short event, void *arg) { struct bufferevent *bufev = arg; struct bufferevent_private *bufev_p = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); int res = 0; short what = BEV_EVENT_WRITING; int connected = 0; ev_ssize_t atmost = -1; _bufferevent_incref_and_lock(bufev); if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the * timeout, since a read has occurred */ what |= BEV_EVENT_TIMEOUT; goto error; } if (bufev_p->connecting) { int c = evutil_socket_finished_connecting(fd); /* we need to fake the error if the connection was refused * immediately - usually connection to localhost on BSD */ if (bufev_p->connection_refused) { bufev_p->connection_refused = 0; c = -1; } if (c == 0) goto done; bufev_p->connecting = 0; if (c < 0) { event_del(&bufev->ev_write); event_del(&bufev->ev_read); _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); goto done; } else { connected = 1; #ifdef WIN32 if (BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); bufferevent_async_set_connected(bufev); _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); goto done; } #endif _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) { event_del(&bufev->ev_write); goto done; } } } atmost = _bufferevent_get_write_max(bufev_p); if (bufev_p->write_suspended) goto done; if (evbuffer_get_length(bufev->output)) { evbuffer_unfreeze(bufev->output, 1); res = evbuffer_write_atmost(bufev->output, fd, atmost); evbuffer_freeze(bufev->output, 1); if (res == -1) { int err = evutil_socket_geterror(fd); if (EVUTIL_ERR_RW_RETRIABLE(err)) goto reschedule; what |= BEV_EVENT_ERROR; } else if (res == 0) { /* eof case XXXX Actually, a 0 on write doesn't indicate an EOF. An ECONNRESET might be more typical. */ what |= BEV_EVENT_EOF; } if (res <= 0) goto error; _bufferevent_decrement_write_buckets(bufev_p, res); } if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } /* * Invoke the user callback if our buffer is drained or below the * low watermark. */ if ((res || !connected) && evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { _bufferevent_run_writecb(bufev); } goto done; reschedule: if (evbuffer_get_length(bufev->output) == 0) { event_del(&bufev->ev_write); } goto done; error: bufferevent_disable(bufev, EV_WRITE); _bufferevent_run_eventcb(bufev, what); done: _bufferevent_decref_and_unlock(bufev); }
主要做了三件事:
- 通过evbuffer_write_atmost将输出缓冲output中的数据写入fd中。
- 如果output中的数据全部处理完毕,删除写事件
- 调用用户定义的写事件回调函数
参考资料: