zoukankan      html  css  js  c++  java
  • libevent(九)bufferevent

    bufferevent,带buffer的event

    struct bufferevent {
        struct event_base *ev_base;
        const struct bufferevent_ops *be_ops;   // backend
        
        struct event ev_read;               // 读事件
        struct event ev_write;              // 写事件
        
        struct evbuffer *input;             // 读缓冲
        struct evbuffer *output;            // 写缓冲
        
        bufferevent_data_cb readcb;         // 读事件回调
        bufferevent_data_cb writecb;        // 写事件回调
        bufferevent_event_cb errorcb;       // error回调
        void *cbarg;                        // 回调函数参数
        
        struct event_watermark wm_read;
        struct event_watermark wm_write;
        
        struct timeval timeout_read;
        struct timeval timeout_write;
        
        short enabled;
    };

    下面简单分析bufferevent相关函数(示例DEMO)

    bufferevent_socket_new

    struct bufferevent *
    bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
        int options)
    {
        struct bufferevent_private *bufev_p;
        struct bufferevent *bufev;
    
    #ifdef WIN32
        if (base && event_base_get_iocp(base))
            return bufferevent_async_new(base, fd, options);
    #endif
    
        if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
            return NULL;
    
        if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
                        options) < 0) {
            mm_free(bufev_p);
            return NULL;
        }
        bufev = &bufev_p->bev;
        evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
    
        event_assign(&bufev->ev_read, bufev->ev_base, fd,
            EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
        event_assign(&bufev->ev_write, bufev->ev_base, fd,
            EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
    
        evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
    
        evbuffer_freeze(bufev->input, 0);
        evbuffer_freeze(bufev->output, 1);
    
        return bufev;
    }

    函数做了4件事:

    1. 设置backend: bufferevent_ops_socket
    2. 设置fd的读事件回调: bufferevent_readcb
    3. 设置fd的写事件回调: bufferevent_writecb
    4. 设置输出缓冲区的回调: bufferevent_socket_outbuf_cb

    backend结构如下:

    const struct bufferevent_ops bufferevent_ops_socket = {
        "socket",
        evutil_offsetof(struct bufferevent_private, bev),
        be_socket_enable,
        be_socket_disable,
        be_socket_destruct,
        be_socket_adj_timeouts,
        be_socket_flush,
        be_socket_ctrl,
    };

    bufferevent_setcb

    void
    bufferevent_setcb(struct bufferevent *bufev,
        bufferevent_data_cb readcb, bufferevent_data_cb writecb,
        bufferevent_event_cb eventcb, void *cbarg)
    {
        BEV_LOCK(bufev);
    
        bufev->readcb = readcb;
        bufev->writecb = writecb;
        bufev->errorcb = eventcb;
    
        bufev->cbarg = cbarg;
        BEV_UNLOCK(bufev);
    }

    该函数主要设置用户回调函数。

    bufferevent_enable

    int
    bufferevent_enable(struct bufferevent *bufev, short event)
    {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        short impl_events = event;
        int r = 0;
    
        _bufferevent_incref_and_lock(bufev);
        if (bufev_private->read_suspended)
            impl_events &= ~EV_READ;
        if (bufev_private->write_suspended)
            impl_events &= ~EV_WRITE;
    
        bufev->enabled |= event;
    
        if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
            r = -1;
    
        _bufferevent_decref_and_unlock(bufev);
        return r;
    }
    
    static int
    be_socket_enable(struct bufferevent *bufev, short event)
    {
        if (event & EV_READ) {
            if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
                return -1;
        }
        if (event & EV_WRITE) {
            if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
                return -1;
        }
        return 0;
    }
    
    #define be_socket_add(ev, t)            
        _bufferevent_add_event((ev), (t))
    
    int
    _bufferevent_add_event(struct event *ev, const struct timeval *tv)
    {
        if (tv->tv_sec == 0 && tv->tv_usec == 0)
            return event_add(ev, NULL);
        else
            return event_add(ev, tv);
    }

    该函数将fd加入到epoll中。

    事件流程

    当fd上有读事件发生时,首先调用bufferevent_readcb。

    static void
    bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
    {
        struct bufferevent *bufev = arg;
        struct bufferevent_private *bufev_p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        struct evbuffer *input;
        int res = 0;
        short what = BEV_EVENT_READING;
        ev_ssize_t howmuch = -1, readmax=-1;
    
        _bufferevent_incref_and_lock(bufev);
    
        if (event == EV_TIMEOUT) {
            /* Note that we only check for event==EV_TIMEOUT. If
             * event==EV_TIMEOUT|EV_READ, we can safely ignore the
             * timeout, since a read has occurred */
            what |= BEV_EVENT_TIMEOUT;
            goto error;
        }
    
        input = bufev->input;
    
        /*
         * If we have a high watermark configured then we don't want to
         * read more data than would make us reach the watermark.
         */
        if (bufev->wm_read.high != 0) {
            howmuch = bufev->wm_read.high - evbuffer_get_length(input);
            /* we somehow lowered the watermark, stop reading */
            if (howmuch <= 0) {
                bufferevent_wm_suspend_read(bufev);
                goto done;
            }
        }
        readmax = _bufferevent_get_read_max(bufev_p);
        if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
                               * uglifies this code. XXXX */
            howmuch = readmax;
        if (bufev_p->read_suspended)
            goto done;
    
        evbuffer_unfreeze(input, 0);
        res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
        evbuffer_freeze(input, 0);
    
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            /* error case */
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case */
            what |= BEV_EVENT_EOF;
        }
    
        if (res <= 0)
            goto error;
    
        _bufferevent_decrement_read_buckets(bufev_p, res);
    
        /* Invoke the user callback - must always be called last */
        if (evbuffer_get_length(input) >= bufev->wm_read.low)
            _bufferevent_run_readcb(bufev);
    
        goto done;
    
     reschedule:
        goto done;
    
     error:
        bufferevent_disable(bufev, EV_READ);
        _bufferevent_run_eventcb(bufev, what);
    
     done:
        _bufferevent_decref_and_unlock(bufev);
    }

    通过evbuffer_read(内部调用recv),将fd上的数据读入输入缓冲input中。

    通过_bufferevent_run_readcb调用用户回调函数

    (如果出错,通过_bufferevent_run_eventcb调用errorcb)

    void
    _bufferevent_run_readcb(struct bufferevent *bufev)
    {
        /* Requires that we hold the lock and a reference */
        struct bufferevent_private *p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        if (bufev->readcb == NULL)
            return;
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
            p->readcb_pending = 1;
            if (!p->deferred.queued)
                SCHEDULE_DEFERRED(p);
        } else {
            bufev->readcb(bufev, bufev->cbarg);
        }
    }

    在用户回调函数中,可以通过 bufferevent_read 取出输入缓冲input中的数据。

    size_t
    bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    {
        return (evbuffer_remove(bufev->input, data, size));
    }
    
    /* Reads data from an event buffer and drains the bytes read */
    int
    evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
    {
        ev_ssize_t n;
        EVBUFFER_LOCK(buf);
        n = evbuffer_copyout(buf, data_out, datlen);
        if (n > 0) {
            if (evbuffer_drain(buf, n)<0)
                n = -1;
        }
        EVBUFFER_UNLOCK(buf);
        return (int)n;
    }

    参考资料:

    Libevent源码分析-----evbuffer结构与基本操作

    Libevent源码分析-----更多evbuffer操作函数

  • 相关阅读:
    使用 requests 维持会话
    使用 requests 发送 POST 请求
    使用 requests 发送 GET 请求
    requests 安装
    使用 urllib 分析 Robots 协议
    使用 urllib 解析 URL 链接
    使用 urllib 处理 HTTP 异常
    使用 urllib 处理 Cookies 信息
    使用 urllib 设置代理服务
    按单生产程序发布
  • 原文地址:https://www.cnblogs.com/gattaca/p/7827479.html
Copyright © 2011-2022 走看看