zoukankan      html  css  js  c++  java
  • epoll 边界触发模式3的实现

    前段时间写过一篇博客介绍epoll边界触发模式的三种实现模式,并在http://www.cnblogs.com/sniperHW/archive/2012/04/09/2438850.html
    中贴出了前面两种的实现代码,今天将介绍第三种模式,也就是每线程执行
    一个epoll主循环的方式.

    首先贴出接口的API:

    #ifndef _KENDYNET_H
    #define _KENDYNET_H
    typedef struct list_node
    {
        struct list_node *next;
    }list_node;
    
    #define LIST_NODE list_node node;
    
    //定义系统支持的最大套接字和engine的数量
    #define MAX_ENGINE 1
    #define MAX_SOCKET 4096
    
    /*IO请求和完成队列使用的结构*/
    typedef struct
    {
        LIST_NODE;
        struct iovec *iovec;
        int    iovec_count;
    }st_io;
    
    
    //初始化网络系统
    int      InitNetSystem();
    
    typedef int HANDLE;
    struct block_queue;
    
    typedef void (*OnRead)(int,st_io*);
    typedef void (*OnWrite)(int,st_io*);
    
    HANDLE   CreateEngine();
    void     CloseEngine(HANDLE);
    int      EngineRun(HANDLE,int timeout);    
    int     Bind2Engine(HANDLE,HANDLE,OnRead,OnWrite);
    
    int WSASend(HANDLE,st_io*);
    int WSARecv(HANDLE,st_io*);
    
    #endif

    KendyNet.c

    #include "KendyNet.h"
    #include "epoll.h"
    #include "Engine.h"
    #include "Socket.h"
    #include "link_list.h"
    #include "HandleMgr.h"
    #include <assert.h>
    
    int InitNetSystem()
    {
        return InitHandleMgr();
    }
    
    int EngineRun(HANDLE engine,int timeout)
    {
        engine_t e = GetEngineByHandle(engine);
        if(!e)
            return -1;
        return e->Loop(e,timeout);    
    }
    
    HANDLE CreateEngine()
    {
        HANDLE engine = NewEngine();
        if(engine >= 0)
        {
            engine_t e = GetEngineByHandle(engine);
            if(0 != e->Init(e))
            {
                CloseEngine(engine);
                engine = -1;
            }
            else
            {
                memset(e->events,0,sizeof(e->events));
                LIST_CLEAR(e->actived);
            }
        }
        return engine;
    }
    
    void CloseEngine(HANDLE handle)
    {
        ReleaseEngine(handle);
    }
    
    int Bind2Engine(HANDLE e,HANDLE s,OnRead _OnRead,OnWrite _OnWrite)
    {
        engine_t engine = GetEngineByHandle(e);
        socket_t sock   = GetSocketByHandle(s);
        if(!engine || ! sock)
            return -1;
        sock->OnRead = _OnRead;
        sock->OnWrite = _OnWrite;
        if(engine->Register(engine,sock) == 0)
        {
            sock->engine = engine;
            return 0;
        }
        return -1;
    }
    
    int WSASend(HANDLE sock,st_io *io)
    {
        assert(io);
        socket_t s = GetSocketByHandle(sock);
        if(!s)
            return -1;
        LIST_PUSH_BACK(s->pending_send,io);
        if(s->engine && s->writeable && !s->isactived)
        {
            s->isactived = 1;
            LIST_PUSH_BACK(s->engine->actived,s);
        }
        return 0;
    }
    
    int WSARecv(HANDLE sock,st_io *io)
    {
        assert(io);
        socket_t s = GetSocketByHandle(sock);
        if(!s)
            return -1;
        LIST_PUSH_BACK(s->pending_recv,io);
        if(s->engine && s->readable && !s->isactived)
        {
            s->isactived = 1;
            LIST_PUSH_BACK(s->engine->actived,s);    
        }
        return 0;
    }

    如前两个模式一样,WASSend/WSARecv仍旧接受一个st_io参数,在此,WASSend/WSARecv完全不处理实际的IO请求,
    仅仅将请求存放在队列中,IO请求将会在epoll的主循环中完成.

    再看一下Bind2Engine函数,这次Bind2Engine多加了两个参数,分别是两个函数指针,当epoll主循环处理完一个IO请求后
    将会执行回调,通知上层一个IO已经完成.

    然后是epoll主循环:

    int epoll_loop(engine_t n,int timeout)
    {
        assert(n);
        int nfds = TEMP_FAILURE_RETRY(epoll_wait(n->poller_fd,n->events,MAX_SOCKET,timeout));
        if(nfds < 0)
            return -1;
        int i;
        for(i = 0 ; i < nfds ; ++i)
        {    
            socket_t sock = (socket_t)n->events[i].data.ptr;
            if(sock)
            {
                //套接口可读
                if(n->events[i].events & EPOLLIN)
                    on_read_active(sock);
                //套接口可写
                if(n->events[i].events & EPOLLOUT)
                    on_write_active(sock);    
            
                if(n->events[i].events & EPOLLERR)
                {
                    //套接口异常
                }
            }
        }
        
        while(!LIST_IS_EMPTY(n->actived))
        {
            socket_t s = LIST_POP(socket_t,n->actived);
            s->isactived = 0;
            if(Process(s) && s->isactived == 0)
            {
                s->isactived = 1;
                LIST_PUSH_BACK(n->actived,s);
            }
        }
        return 0;
    }

    每当套接口从未激发变为激发,会判断套接口是否有pending的IO请求,如果有且套接口当前不在激活队列(n->actived)中,
    则将套接口放到激活队列中.也就是套接口处于激活队列中必须同时满足两个条件,1:套接口处于激发态,2:有pending的
    IO请求.

    然后主循环会遍历激活队列中的所有套接口,完成其IO请求,当套接口不再同时满足上述两个条件时,将其从激活队列中
    移除.

    然后是socket.c

    #include <assert.h>
    #include <stdlib.h>
    #include <errno.h>
    #include "SocketWrapper.h"
    #include "KendyNet.h"
    #include "epoll.h"
    #include "Socket.h"
    
    socket_t create_socket()
    {
        socket_t s = malloc(sizeof(*s));
        if(s)
        {
            s->pending_send = LIST_CREATE();
            s->pending_recv = LIST_CREATE();
            s->status = 0;
            s->engine = 0;
            s->isactived = 0;
        }
        return s;
    }
    
    void free_socket(socket_t *s)
    {
        assert(s);assert(*s);
        destroy_list(&(*s)->pending_send);
        destroy_list(&(*s)->pending_recv);
        free(*s);
        *s = 0;
    }
    
    void on_read_active(socket_t s)
    {
        s->readable = 1;
        if(!s->isactived && !LIST_IS_EMPTY(s->pending_recv))
        {
            s->isactived = 1;
            LIST_PUSH_BACK(s->engine->actived,s);
        }
    }
    
    void on_write_active(socket_t s)
    {
        s->writeable = 1;
        if(!s->isactived && !LIST_IS_EMPTY(s->pending_send))
        {
            s->isactived = 1;
            LIST_PUSH_BACK(s->engine->actived,s);
        }    
    }
    
    int  Process(socket_t s)
    {
        return _recv(s) > 0 &&  _send(s) > 0;
    }
    
    int _recv(socket_t s)
    {
        assert(s);
        int ret = 0;
        st_io* io_req = LIST_POP(st_io*,s->pending_recv);
        int bytes_transfer = 0;
        if(s->readable && io_req)
        {
            bytes_transfer = TEMP_FAILURE_RETRY(readv(s->fd,io_req->iovec,io_req->iovec_count));
            if(bytes_transfer < 0)
            {
                switch(errno)
                {
                    case EAGAIN:
                    {
                        s->readable = 0;
                        //将请求重新放回到队列
                        LIST_PUSH_FRONT(s->pending_recv,io_req);
                        ret = 0;
                    }
                    break;
                    default://连接出错
                    {
                        ret = -1;
                    }
                    break;
                }        
            }
            else if(bytes_transfer == 0)
                ret = -1;
            else
                ret = bytes_transfer;
        }
        
        if(ret != 0)
            s->OnRead(bytes_transfer,io_req);
        return ret;
    }
    
    int _send(socket_t s)
    {
        assert(s);
        int ret = 0;
        st_io* io_req = LIST_POP(st_io*,s->pending_send);
        int bytes_transfer = 0;    
        if(s->writeable && io_req)
        {
            bytes_transfer = TEMP_FAILURE_RETRY(writev(s->fd,io_req->iovec,io_req->iovec_count));
            if(bytes_transfer < 0)
            {
                switch(errno)
                {
                    case EAGAIN:
                    {
                        s->writeable = 0;
                        //将请求重新放回到队列
                        LIST_PUSH_FRONT(s->pending_send,io_req);
                        ret = 0;
                    }
                    break;
                    default://连接出错
                        ret = -1;
                    break;
                }        
            }
            else if(bytes_transfer == 0)
                ret = -1;
            else
                ret = bytes_transfer;
        }
        
        if(ret != 0)
            s->OnWrite(bytes_transfer,io_req);    
        return ret;        
    }

    最后是测试代码:

    #include <stdio.h>
    #include <stdlib.h>
    #include "KendyNet.h"
    #include "thread.h"
    #include "SocketWrapper.h"
    
    
    typedef struct IoContext
    {
        st_io m_ioStruct;
        void             *ud;
    }IoContext;
    
    typedef struct _Socket
    {
        char send_buf[128];
        char recv_buf[128];
        struct iovec recv_iovec;
        struct iovec send_iovec;
        IoContext m_IORecvComp;
        IoContext m_IOSendComp;
        HANDLE    m_sock;
    }_Socket;
    
    HANDLE engine;
    HANDLE listerfd;
    const char *ip;
    long port;
    
    void OnReadFinish(int bytetransfer,st_io *io)
    {
        _Socket *sock = (_Socket*)(((IoContext*)io)->ud);
        if(bytetransfer <= 0)
        {
            //连接断开
            if(CloseSocket(sock->m_sock) == 0)
                free(sock);            
        }
        else
        {
            if(io->iovec->iov_len == bytetransfer)
            {
                //一个包读完整了
                memcpy(sock->send_buf,sock->recv_buf,128);
                sock->m_IOSendComp.m_ioStruct.iovec->iov_len = 128;
                sock->m_IOSendComp.m_ioStruct.iovec->iov_base = sock->send_buf;
                WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp));            
            }
            else
            {
                sock->m_IORecvComp.m_ioStruct.iovec->iov_len -= bytetransfer;
                sock->m_IORecvComp.m_ioStruct.iovec->iov_base += bytetransfer;
                WSARecv(sock->m_sock,(st_io*)&(sock->m_IORecvComp));    
            }
        }
    }
    
    void OnWriteFinish(int bytetransfer,st_io *io)
    {
        _Socket *sock = (_Socket*)(((IoContext*)io)->ud);
        if(bytetransfer <= 0)
        {
            //连接断开
            if(CloseSocket(sock->m_sock) == 0)
                free(sock);        
        }
        else
        {
            if(io->iovec->iov_len == bytetransfer)
            {
                //一个包写完整了
                sock->m_IORecvComp.m_ioStruct.iovec->iov_len = 128;
                sock->m_IORecvComp.m_ioStruct.iovec->iov_base = sock->recv_buf;
                WSARecv(sock->m_sock,(st_io*)&(sock->m_IORecvComp));            
            }
            else
            {
                sock->m_IOSendComp.m_ioStruct.iovec->iov_len -= bytetransfer;
                sock->m_IOSendComp.m_ioStruct.iovec->iov_base += bytetransfer;
                WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp));    
            }
        }
    }
    
    void *ListerRoutine(void *arg)
    {
        struct sockaddr_in servaddr;
        HANDLE listerfd;
        if((listerfd = Tcp_Listen(ip,port,&servaddr,5)) == 0)
        {
            while(1)
            {
                struct sockaddr sa;
                socklen_t salen;
                HANDLE sock = Accept(listerfd,&sa,&salen);
                if(sock >= 0)
                {
                    setNonblock(sock);
                    _Socket *_sock = malloc(sizeof(*_sock));
                    _sock->m_sock = sock;
                    _sock->m_IORecvComp.ud = _sock->m_IOSendComp.ud = _sock;
                    _sock->recv_iovec.iov_base = _sock->recv_buf;
                    _sock->send_iovec.iov_base = _sock->send_buf;
                    _sock->m_IORecvComp.m_ioStruct.iovec = &_sock->recv_iovec;
                    _sock->m_IOSendComp.m_ioStruct.iovec = &_sock->send_iovec;
                    _sock->m_IOSendComp.m_ioStruct.iovec_count = _sock->m_IORecvComp.m_ioStruct.iovec_count = 1;
                    //先发请求,再bind是线程安全的
                    _sock->m_IORecvComp.m_ioStruct.iovec->iov_len = 128;
                    WSARecv(sock,(st_io*)&(_sock->m_IORecvComp));
                    if(0 != Bind2Engine(engine,sock,OnReadFinish,OnWriteFinish))
                    {
                        printf("bind出错\n");
                        CloseSocket(sock);
                        free(_sock);
                    }
                }
                else
                {
                    printf("accept出错\n");
                }
            }
            printf("listener 终止\n");
        }
        return 0;
    }
    
    int main(int argc,char **argv)
    {
        ip = "192.168.6.205";//argv[];
        port = atoi(argv[1]);
        signal(SIGPIPE,SIG_IGN);
        if(InitNetSystem() != 0)
        {
            printf("Init error\n");
            return 0;
        }
        
        thread_t listener = create_thread(0);
        start_run(listener,ListerRoutine,listener);
        engine = CreateEngine(0);
        while(1)
            EngineRun(engine,500);
        return 0;
    }

    此实现中没有使用任何的锁,所以不是线程安全的,如果需要使得IO与逻辑在不同的线程处理,则可以添加
    一个宏开关,以选择是否使用锁保护关键数据.

    项目地址:https://github.com/sniperHW/kendylib

  • 相关阅读:
    使用WinDbg调试SQL Server——入门
    SQL Server里如何随机记录集
    相关列的基数计算
    自增长的聚集键值不会扩展(scale)
    使用正确的筛选参数来提高查询性能
    可更新聚集列存储索引幻想
    在SQL Server 2014里可更新的列存储索引 (Updateable Column Store Indexes)
    SQL Server 2014里的IO资源调控器
    SQL Server 2014里的针对基数估计的新设计(New Design for Cardinality Estimation)
    缓存池扩展 (Buffer Pool Extension)实践
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2468186.html
Copyright © 2011-2022 走看看