zoukankan      html  css  js  c++  java
  • libevent(九)bufferevent

    bufferevent,带buffer的event

    struct bufferevent {
        struct event_base *ev_base;
        const struct bufferevent_ops *be_ops;   // backend
        
        struct event ev_read;               // 读事件
        struct event ev_write;              // 写事件
        
        struct evbuffer *input;             // 读缓冲
        struct evbuffer *output;            // 写缓冲
        
        bufferevent_data_cb readcb;         // 读事件回调
        bufferevent_data_cb writecb;        // 写事件回调
        bufferevent_event_cb errorcb;       // error回调
        void *cbarg;                        // 回调函数参数
        
        struct event_watermark wm_read;
        struct event_watermark wm_write;
        
        struct timeval timeout_read;
        struct timeval timeout_write;
        
        short enabled;
    };

    下面简单分析bufferevent相关函数(示例DEMO)

    bufferevent_socket_new

    struct bufferevent *
    bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
        int options)
    {
        struct bufferevent_private *bufev_p;
        struct bufferevent *bufev;
    
    #ifdef WIN32
        if (base && event_base_get_iocp(base))
            return bufferevent_async_new(base, fd, options);
    #endif
    
        if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
            return NULL;
    
        if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
                        options) < 0) {
            mm_free(bufev_p);
            return NULL;
        }
        bufev = &bufev_p->bev;
        evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
    
        event_assign(&bufev->ev_read, bufev->ev_base, fd,
            EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
        event_assign(&bufev->ev_write, bufev->ev_base, fd,
            EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
    
        evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
    
        evbuffer_freeze(bufev->input, 0);
        evbuffer_freeze(bufev->output, 1);
    
        return bufev;
    }

    函数做了4件事:

    1. 设置backend: bufferevent_ops_socket
    2. 设置fd的读事件回调: bufferevent_readcb
    3. 设置fd的写事件回调: bufferevent_writecb
    4. 设置输出缓冲区的回调: bufferevent_socket_outbuf_cb

    backend结构如下:

    const struct bufferevent_ops bufferevent_ops_socket = {
        "socket",
        evutil_offsetof(struct bufferevent_private, bev),
        be_socket_enable,
        be_socket_disable,
        be_socket_destruct,
        be_socket_adj_timeouts,
        be_socket_flush,
        be_socket_ctrl,
    };

    bufferevent_setcb

    void
    bufferevent_setcb(struct bufferevent *bufev,
        bufferevent_data_cb readcb, bufferevent_data_cb writecb,
        bufferevent_event_cb eventcb, void *cbarg)
    {
        BEV_LOCK(bufev);
    
        bufev->readcb = readcb;
        bufev->writecb = writecb;
        bufev->errorcb = eventcb;
    
        bufev->cbarg = cbarg;
        BEV_UNLOCK(bufev);
    }

    该函数主要设置用户回调函数。

    bufferevent_enable

    int
    bufferevent_enable(struct bufferevent *bufev, short event)
    {
        struct bufferevent_private *bufev_private =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        short impl_events = event;
        int r = 0;
    
        _bufferevent_incref_and_lock(bufev);
        if (bufev_private->read_suspended)
            impl_events &= ~EV_READ;
        if (bufev_private->write_suspended)
            impl_events &= ~EV_WRITE;
    
        bufev->enabled |= event;
    
        if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
            r = -1;
    
        _bufferevent_decref_and_unlock(bufev);
        return r;
    }
    
    static int
    be_socket_enable(struct bufferevent *bufev, short event)
    {
        if (event & EV_READ) {
            if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1)
                return -1;
        }
        if (event & EV_WRITE) {
            if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1)
                return -1;
        }
        return 0;
    }
    
    #define be_socket_add(ev, t)            
        _bufferevent_add_event((ev), (t))
    
    int
    _bufferevent_add_event(struct event *ev, const struct timeval *tv)
    {
        if (tv->tv_sec == 0 && tv->tv_usec == 0)
            return event_add(ev, NULL);
        else
            return event_add(ev, tv);
    }

    该函数将fd加入到epoll中。

    事件流程

    当fd上有读事件发生时,首先调用bufferevent_readcb。

    static void
    bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
    {
        struct bufferevent *bufev = arg;
        struct bufferevent_private *bufev_p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        struct evbuffer *input;
        int res = 0;
        short what = BEV_EVENT_READING;
        ev_ssize_t howmuch = -1, readmax=-1;
    
        _bufferevent_incref_and_lock(bufev);
    
        if (event == EV_TIMEOUT) {
            /* Note that we only check for event==EV_TIMEOUT. If
             * event==EV_TIMEOUT|EV_READ, we can safely ignore the
             * timeout, since a read has occurred */
            what |= BEV_EVENT_TIMEOUT;
            goto error;
        }
    
        input = bufev->input;
    
        /*
         * If we have a high watermark configured then we don't want to
         * read more data than would make us reach the watermark.
         */
        if (bufev->wm_read.high != 0) {
            howmuch = bufev->wm_read.high - evbuffer_get_length(input);
            /* we somehow lowered the watermark, stop reading */
            if (howmuch <= 0) {
                bufferevent_wm_suspend_read(bufev);
                goto done;
            }
        }
        readmax = _bufferevent_get_read_max(bufev_p);
        if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited"
                               * uglifies this code. XXXX */
            howmuch = readmax;
        if (bufev_p->read_suspended)
            goto done;
    
        evbuffer_unfreeze(input, 0);
        res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */
        evbuffer_freeze(input, 0);
    
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            /* error case */
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case */
            what |= BEV_EVENT_EOF;
        }
    
        if (res <= 0)
            goto error;
    
        _bufferevent_decrement_read_buckets(bufev_p, res);
    
        /* Invoke the user callback - must always be called last */
        if (evbuffer_get_length(input) >= bufev->wm_read.low)
            _bufferevent_run_readcb(bufev);
    
        goto done;
    
     reschedule:
        goto done;
    
     error:
        bufferevent_disable(bufev, EV_READ);
        _bufferevent_run_eventcb(bufev, what);
    
     done:
        _bufferevent_decref_and_unlock(bufev);
    }

    通过evbuffer_read(内部调用recv),将fd上的数据读入输入缓冲input中。

    通过_bufferevent_run_readcb调用用户回调函数

    (如果出错,通过_bufferevent_run_eventcb调用errorcb)

    void
    _bufferevent_run_readcb(struct bufferevent *bufev)
    {
        /* Requires that we hold the lock and a reference */
        struct bufferevent_private *p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        if (bufev->readcb == NULL)
            return;
        if (p->options & BEV_OPT_DEFER_CALLBACKS) {
            p->readcb_pending = 1;
            if (!p->deferred.queued)
                SCHEDULE_DEFERRED(p);
        } else {
            bufev->readcb(bufev, bufev->cbarg);
        }
    }

    在用户回调函数中,可以通过 bufferevent_read 取出输入缓冲input中的数据。

    size_t
    bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    {
        return (evbuffer_remove(bufev->input, data, size));
    }
    
    /* Reads data from an event buffer and drains the bytes read */
    int
    evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
    {
        ev_ssize_t n;
        EVBUFFER_LOCK(buf);
        n = evbuffer_copyout(buf, data_out, datlen);
        if (n > 0) {
            if (evbuffer_drain(buf, n)<0)
                n = -1;
        }
        EVBUFFER_UNLOCK(buf);
        return (int)n;
    }

    参考资料:

    Libevent源码分析-----evbuffer结构与基本操作

    Libevent源码分析-----更多evbuffer操作函数

  • 相关阅读:
    python 性能测试
    python calendar
    python datetime
    Zookeeper的功能以及工作原理
    ADB 源码分析(一) ——ADB模块简述【转】
    ADB模块源码分析(二)——adb server的启动
    C/C++中的预编译指令
    开启andriod手机的adbd,进行无线adb调试
    adb和adbd详尽分析
    adb client, adb server, adbd原理浅析(附带我的操作过程)
  • 原文地址:https://www.cnblogs.com/gattaca/p/7827479.html
Copyright © 2011-2022 走看看