zoukankan      html  css  js  c++  java
  • 重复使用bufferevent的一些体会

      看了一些网上使用bufferevent的例子,一般都是一个连接对应一个bufferevent,连接accept的时候,给bufferevent设置上fd和对应的回调,在连接断开或者发生错误的时候,将bufferevent释放掉。

    昨天在使用bufferevent的时候,突发奇想,准备了一个connection对象池,每个connection关联一个bufferevent,然后多线程处理connection的数据收发,每个线程一个connection链表。在connection出错或者关闭的时候,清空connection的相关状态,重新放入队列中。

    1.每次有新的连接被accept后取出队列里空闲的connection,设置相关fd和回掉

     1 bool Connection::SetupConnection(evutil_socket_t fd)
     2 {
     3     m_fd = fd;
     4     evutil_make_socket_nonblocking(fd);
     5     bufferevent_setcb(m_be, DoRead, NULL, DoError, this);
     6     bufferevent_setfd(m_be, fd);
     7     if (bufferevent_enable(m_be, EV_READ|EV_WRITE) != -1)
     8     {
     9         m_bUsed = true;
    10     }
    11     return m_bUsed;
    12 }

    2.在event_cb中,如果收到EOF或者Error的时候,关闭connection

    1 void Connection::DoEvent(struct bufferevent *bev, short what, void *ctx)
    2 {
    3     Connection *pConn = (Connection*)ctx;
    4     if(what & (BEV_EVENT_ERROR|BEV_EVENT_EOF))
    5     {
    6         pConn->CloseConnection();
    7     }
    8     ......
    9 }
    void Connection::CloseConnection()
    {
        //因为CloseConnection是在libevent线程的Event回掉中调用的,直接
        //bufferevent_setfd就会自动把之前的event给delete掉
        //bufferevent_disable(m_be, EV_READ | EV_WRITE);
        bufferevent_setfd(m_be, -1);
        evutil_closesocket(m_fd);
        m_fd = -1;
        //当bufferevent_setfd设置fd是-1的时候,并不会将write和read的event
        //加入到event_base,如果event_base中监听的event数量为0时候,
        //event_base_loop会退出,导致libevent线程结束,所以这里重新手动
        //开启write,read事件
        bufferevent_enable(m_be, EV_READ|EV_WRITE) ;
        m_bUsed = false;
    }

    3.在有新的客户端accept的时候,重新调用SetupConnection建立新的connection连接。

      总所周知,bufferevent有个管理收发数据的利器evbuffer,一般发送数据都是调用bufferevent_write,将数据写入发送的evbuffer里,在socket可以发送数据的时候再发送数据,read也是这样,将socket中的数据先读取到接收的evbuffer里,在调用read_cb通知用户数据可读。那么,问题来了,在CloseConnection里调用的一系列bufferevent接口中,这两个evbuffer的数据并没有清空,这样就会造成在重新SetupConnection的时候,bufferevent会有一部分之前的fd残留下来的数据,导致新的fd在处理接收和发送数据的时候被干扰。

      第一反应肯定是将evbuffer里的数据清空掉,于是乎就想到evbuffer_drain接口来清理数据。清理接收evbuffer的时候,没问题,而且一般在socket断开或者出错后,也会一次性将接收evbuffer里的数据都读取出来处理的。但是在清理发送evbuffer的时候,发现evbuffer_drain失败,于是乎翻了下libevent的源码:

    int
    evbuffer_drain(struct evbuffer *buf, size_t len)
    {
        struct evbuffer_chain *chain, *next;
        size_t remaining, old_len;
        int result = 0;
    
        EVBUFFER_LOCK(buf);
        old_len = buf->total_len;
    
        if (old_len == 0)
            goto done;
       //在这个地方,直接goto done了
        if (buf->freeze_start) {
            result = -1;
            goto done;
        }
            ....
    }

    buf->freeze_start为1,直接结束了,并没有清空evbuffer。参考了一下http://blog.csdn.net/zhwei_87/article/details/43304383的源码分析后发现bufferevent的发送evbuffer只有在真正发送数据的时候才会unfreeze start,其他时间都是freeze的(所以只允许你将新数据写入到发送evbuffer里,因为写是写到end的,drain是由libevent自己处理的),这也很好理解,因为发送的evbuffer里的数据是不允许用户自己随意drain start的(头部的数据是要发送的),关于start跟end,是因为evbuffer是一个链表结构来存放发送数据的。代码如下,这个函数是libevent真正用来发送数据的回掉,用户的回掉在这个函数最后才会执行。

    static void
    bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
    {
        struct bufferevent *bufev = arg;
        struct bufferevent_private *bufev_p =
            EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
        int res = 0;
        short what = BEV_EVENT_WRITING;
        int connected = 0;
        ev_ssize_t atmost = -1;
    
        _bufferevent_incref_and_lock(bufev);
    
        if (event == EV_TIMEOUT) {
            /* Note that we only check for event==EV_TIMEOUT. If
             * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
             * timeout, since a read has occurred */
            what |= BEV_EVENT_TIMEOUT;
            goto error;
        }
        if (bufev_p->connecting) {
            int c = evutil_socket_finished_connecting(fd);
            /* we need to fake the error if the connection was refused
             * immediately - usually connection to localhost on BSD */
            if (bufev_p->connection_refused) {
              bufev_p->connection_refused = 0;
              c = -1;
            }
    
            if (c == 0)
                goto done;
    
            bufev_p->connecting = 0;
            if (c < 0) {
                event_del(&bufev->ev_write);
                event_del(&bufev->ev_read);
                _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
                goto done;
            } else {
                connected = 1;
    #ifdef WIN32
                if (BEV_IS_ASYNC(bufev)) {
                    event_del(&bufev->ev_write);
                    bufferevent_async_set_connected(bufev);
                    _bufferevent_run_eventcb(bufev,
                            BEV_EVENT_CONNECTED);
                    goto done;
                }
    #endif
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                if (!(bufev->enabled & EV_WRITE) ||
                    bufev_p->write_suspended) {
                    event_del(&bufev->ev_write);
                    goto done;
                }
            }
        }
    
        atmost = _bufferevent_get_write_max(bufev_p);
    
        if (bufev_p->write_suspended)
            goto done;
    
            //这个地方才会unfree掉,并且发送数据
        if (evbuffer_get_length(bufev->output)) {
            evbuffer_unfreeze(bufev->output, 1);
            res = evbuffer_write_atmost(bufev->output, fd, atmost);
            evbuffer_freeze(bufev->output, 1);
            if (res == -1) {
                int err = evutil_socket_geterror(fd);
                if (EVUTIL_ERR_RW_RETRIABLE(err))
                    goto reschedule;
                what |= BEV_EVENT_ERROR;
            } else if (res == 0) {
                /* eof case
                   XXXX Actually, a 0 on write doesn't indicate
                   an EOF. An ECONNRESET might be more typical.
                 */
                what |= BEV_EVENT_EOF;
            }
            if (res <= 0)
                goto error;
    
            _bufferevent_decrement_write_buckets(bufev_p, res);
        }
    
        if (evbuffer_get_length(bufev->output) == 0) {
            event_del(&bufev->ev_write);
        }
    
        /*
         * Invoke the user callback if our buffer is drained or below the
         * low watermark.
         */
        if ((res || !connected) &&
            evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
            _bufferevent_run_writecb(bufev);
        }
    
        goto done;
    
     reschedule:
        if (evbuffer_get_length(bufev->output) == 0) {
            event_del(&bufev->ev_write);
        }
        goto done;
    
     error:
        bufferevent_disable(bufev, EV_WRITE);
        _bufferevent_run_eventcb(bufev, what);
    
     done:
        _bufferevent_decref_and_unlock(bufev);
    }
    View Code

    所以,如果需要清空bufferevent的发送evbuffer里的数据,必须手动调用evbuffer_unfreeze将start unfreeze掉后才能drain。当然,一般bufferevent_write都是在其他线程中做的,所以要确保drain的时候没有线程在进行bufferevent_write,我这里先不处理多线程write这块了,或者直接在SetupConnection的时候去drain。

    void Connection::CloseConnection()
    {
        //需要先unfreeze output evbuffer
        evbuffer_unfreeze(bufferevent_get_output(m_be), 1);
        if (evbuffer_drain(bufferevent_get_input(m_be), evbuffer_get_length(bufferevent_get_input(m_be))))
        {
            ...    
        }
        size_t length = evbuffer_get_length(bufferevent_get_output(m_be));
        if (evbuffer_drain(bufferevent_get_output(m_be), length))
        {
            ...
        }
        bufferevent_setfd(m_be, -1);
        evutil_closesocket(m_fd);
        m_fd = -1;
        bufferevent_enable(m_be, EV_READ|EV_WRITE) ;
        m_bUsed = false;
    }

    这样bufferevent就可以重复使用了,不知道这样是不是正确的做法,有点麻烦,还是重新分配bufferevent来得方便感觉。

  • 相关阅读:
    RegExp实例
    Date类型之组件方法
    Date类型之继承方法
    数组常见方法下
    Math对象
    数组常见方法上
    CSS变量
    基本类型和引用类型
    Python习题集(十五)
    Python习题集(十四)
  • 原文地址:https://www.cnblogs.com/fgokey/p/5123805.html
Copyright © 2011-2022 走看看