zoukankan      html  css  js  c++  java
  • libevent(十)bufferevent 2

    接上文libevent(九)bufferevent

    上文主要讲了bufferevent如何监听读事件,那么bufferevent如何监听写事件呢?

    于一个fd,只要它的写缓冲区没有满,就会触发写事件。

    一般情况下,如果不向这个fd发送大量的数据,它的写缓冲区是不会满的。

    所以,如果一开始就监听写事件,写事件会一直被触发。

    libevent的做法是:

        当我们确实要向fd写入数据时,才监听该fd的写事件。

    监听写事件

    在用户回调函数中,可以通过 bufferevent_write 向输出缓冲output中写数据。

    int
    bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
    {
        if (evbuffer_add(bufev->output, data, size) == -1)
            return (-1);
    
        return 0;
    }
    /* Adds data to an event buffer */
    
    int
    evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
    {
        struct evbuffer_chain *chain, *tmp;
        const unsigned char *data = data_in;
        size_t remain, to_alloc;
        int result = -1;
    
        EVBUFFER_LOCK(buf);
    
        if (buf->freeze_end) {
            goto done;
        }
        /* Prevent buf->total_len overflow */
        if (datlen > EV_SIZE_MAX - buf->total_len) {
            goto done;
        }
    
        chain = buf->last;
    
        /* If there are no chains allocated for this buffer, allocate one
         * big enough to hold all the data. */
        if (chain == NULL) {
            chain = evbuffer_chain_new(datlen);
            if (!chain)
                goto done;
            evbuffer_chain_insert(buf, chain);
        }
    
        if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
            /* Always true for mutable buffers */
            EVUTIL_ASSERT(chain->misalign >= 0 &&
                (ev_uint64_t)chain->misalign <= EVBUFFER_CHAIN_MAX);
            remain = chain->buffer_len - (size_t)chain->misalign - chain->off;
            if (remain >= datlen) {
                /* there's enough space to hold all the data in the
                 * current last chain */
                memcpy(chain->buffer + chain->misalign + chain->off,
                    data, datlen);
                chain->off += datlen;
                buf->total_len += datlen;
                buf->n_add_for_cb += datlen;
                goto out;
            } else if (!CHAIN_PINNED(chain) &&
                evbuffer_chain_should_realign(chain, datlen)) {
                /* we can fit the data into the misalignment */
                evbuffer_chain_align(chain);
    
                memcpy(chain->buffer + chain->off, data, datlen);
                chain->off += datlen;
                buf->total_len += datlen;
                buf->n_add_for_cb += datlen;
                goto out;
            }
        } else {
            /* we cannot write any data to the last chain */
            remain = 0;
        }
    
        /* we need to add another chain */
        to_alloc = chain->buffer_len;
        if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)
            to_alloc <<= 1;
        if (datlen > to_alloc)
            to_alloc = datlen;
        tmp = evbuffer_chain_new(to_alloc);
        if (tmp == NULL)
            goto done;
    
        if (remain) {
            memcpy(chain->buffer + chain->misalign + chain->off,
                data, remain);
            chain->off += remain;
            buf->total_len += remain;
            buf->n_add_for_cb += remain;
        }
    
        data += remain;
        datlen -= remain;
    
        memcpy(tmp->buffer, data, datlen);
        tmp->off = datlen;
        evbuffer_chain_insert(buf, tmp);
        buf->n_add_for_cb += datlen;
    
    out:
        evbuffer_invoke_callbacks(buf);
        result = 0;
    done:
        EVBUFFER_UNLOCK(buf);
        return result;
    }

    现在回顾一下bufferevent_socket_new,我们在这个函数中,设置了输出缓冲区的回调函数

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
    static void
    bufferevent_socket_outbuf_cb(struct evbuffer *buf,
        const struct evbuffer_cb_info *cbinfo,
        void *arg)
    {
        struct bufferevent *bufev = arg;
        struct bufferevent_private *bufev_p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    
        if (cbinfo->n_added &&
            (bufev->enabled & EV_WRITE) &&
            !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
            !bufev_p->write_suspended) {
            /* Somebody added data to the buffer, and we would like to
             * write, and we were not writing.  So, start writing. */
            if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
                /* Should we log this? */
            }
        }
    }

    可以看出,我们在输出缓冲区的回调函数中,将该fd的写事件添加到了epoll中。

    事件流程

    上面我们监听了fd的写事件,而此时该fd的写缓冲区没有满,所以写事件被触发,继而调用我们在上文设置的写事件回调函数 bufferevent_writecb。

    static void
    bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
    {
        struct bufferevent *bufev = arg;
        struct bufferevent_private *bufev_p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        int res = 0;
        short what = BEV_EVENT_WRITING;
        int connected = 0;
        ev_ssize_t atmost = -1;
    
        _bufferevent_incref_and_lock(bufev);
    
        if (event == EV_TIMEOUT) {
            /* Note that we only check for event==EV_TIMEOUT. If
             * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
             * timeout, since a read has occurred */
            what |= BEV_EVENT_TIMEOUT;
            goto error;
        }
        if (bufev_p->connecting) {
            int c = evutil_socket_finished_connecting(fd);
            /* we need to fake the error if the connection was refused
             * immediately - usually connection to localhost on BSD */
            if (bufev_p->connection_refused) {
              bufev_p->connection_refused = 0;
              c = -1;
            }
    
            if (c == 0)
                goto done;
    
            bufev_p->connecting = 0;
            if (c < 0) {
                event_del(&bufev->ev_write);
                event_del(&bufev->ev_read);
                _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
                goto done;
            } else {
                connected = 1;
    #ifdef WIN32
                if (BEV_IS_ASYNC(bufev)) {
                    event_del(&bufev->ev_write);
                    bufferevent_async_set_connected(bufev);
                    _bufferevent_run_eventcb(bufev,
                            BEV_EVENT_CONNECTED);
                    goto done;
                }
    #endif
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                if (!(bufev->enabled & EV_WRITE) ||
                    bufev_p->write_suspended) {
                    event_del(&bufev->ev_write);
                    goto done;
                }
            }
        }
    
        atmost = _bufferevent_get_write_max(bufev_p);
    
        if (bufev_p->write_suspended)
            goto done;
    
        if (evbuffer_get_length(bufev->output)) {
            evbuffer_unfreeze(bufev->output, 1);
            res = evbuffer_write_atmost(bufev->output, fd, atmost);
            evbuffer_freeze(bufev->output, 1);
            if (res == -1) {
                int err = evutil_socket_geterror(fd);
                if (EVUTIL_ERR_RW_RETRIABLE(err))
                    goto reschedule;
                what |= BEV_EVENT_ERROR;
            } else if (res == 0) {
                /* eof case
                   XXXX Actually, a 0 on write doesn't indicate
                   an EOF. An ECONNRESET might be more typical.
                 */
                what |= BEV_EVENT_EOF;
            }
            if (res <= 0)
                goto error;
    
            _bufferevent_decrement_write_buckets(bufev_p, res);
        }
    
        if (evbuffer_get_length(bufev->output) == 0) {
            event_del(&bufev->ev_write);
        }
    
        /*
         * Invoke the user callback if our buffer is drained or below the
         * low watermark.
         */
        if ((res || !connected) &&
            evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
            _bufferevent_run_writecb(bufev);
        }
    
        goto done;
    
     reschedule:
        if (evbuffer_get_length(bufev->output) == 0) {
            event_del(&bufev->ev_write);
        }
        goto done;
    
     error:
        bufferevent_disable(bufev, EV_WRITE);
        _bufferevent_run_eventcb(bufev, what);
    
     done:
        _bufferevent_decref_and_unlock(bufev);
    }

    主要做了三件事:

    1. 通过evbuffer_write_atmost将输出缓冲output中的数据写入fd中。
    2. 如果output中的数据全部处理完毕,删除写事件
    3. 调用用户定义的写事件回调函数

    参考资料:

    Libevent源码分析-----bufferevent工作流程探究

  • 相关阅读:
    树状数组求区间最大值
    ABP Zero最新版源码
    abp zero mysql版正式发布
    基于ABP的Easyui admin framework正式开放源代码
    【推荐】ImageProcessor.Web,再也不用自己生成缩略图了
    ABP教程(四)- 开始一个简单的任务管理系统
    Abp Framework中文文档上线
    【开源】基于EF6+MVC5+API2+Easyui1.4.5+Easyui管理模板开发的管理系统
    web+ admin template,spa管理应用后台,easyui后台正式发布
    ABP教程(三)- 开始一个简单的任务管理系统 – 后端编码
  • 原文地址:https://www.cnblogs.com/gattaca/p/7831488.html
Copyright © 2011-2022 走看看