zoukankan      html  css  js  c++  java
  • epoll 边界触发模式1,2的实现

    本篇贴出在上篇文章中介绍的模式1的实现代码.

    首先,因为是多线程的程序,必须防止某一资源在一个线程中使用的时候,却在另一个线程中释放了.
    其中最主要的便是socket_t结构,为了杜绝这个问题,对应用层来说,应该根本不知道socket_t的存在,
    仅仅提供一个HANDLE给应用层就足够了.

    #ifndef _KENDYNET_H
    #define _KENDYNET_H
    #include "MsgQueue.h"
    typedef struct list_node
    {
        struct list_node *next;
    }list_node;
    
    #define LIST_NODE list_node node;
    
    /*IO请求和完成队列使用的结构*/
    typedef struct
    {
        LIST_NODE;
        struct iovec *iovec;
        int    iovec_count;
        int    bytes_transfer;
        int    error_code;
    }st_io;
    
    enum
    {    
        //完成模式,当套接口从不可读/写变为激活态时, 如果有IO请求,则完成请求,并将完成通告发送给工作线程
        MODE_COMPLETE = 0,
        //poll模式,当套接口从不可读/写变为激活态时, 如果有IO请求,将请求发送给工作线程,由工作线程完成请求
        MODE_POLL,
    };
    
    //初始化网络系统
    int      InitNetSystem();
    
    typedef int HANDLE;
    struct block_queue;
    
    struct block_queue* CreateEventQ();
    void DestroyEventQ(struct block_queue**);
    
    HANDLE   CreateEngine(char);
    void     CloseEngine(HANDLE);
    int      EngineRun(HANDLE);
    
    int     Bind2Engine(HANDLE,HANDLE);
    
    //获取和投递事件到engine的队列中
    //int      GetQueueEvent(HANDLE,struct block_queue*,st_io **,int timeout);
    int      GetQueueEvent(HANDLE,MsgQueue_t,st_io **,int timeout);
    int      PutQueueEvent(HANDLE,st_io *);
    
    /*
    *   发送和接收,如果操作立即完成返回完成的字节数
    *   否则返回0,当IO异步完成后会通告到事件队列中.
    *   返回-1表示套接口出错或断开
    *   notify:当IO立即可完成时,是否向事件队
    *   列提交一个完成通告,设置为1会提交,0为不提交.
    */
    int WSASend(HANDLE,st_io*,int notify);
    int WSARecv(HANDLE,st_io*,int notify);
    
    #endif

    这里定义的st_io结构,相当与IOCP里的OVERLAPPED结构,其中,第一个成员是list_node,这样
    可以很方便的将st_io放在队列中.

    然后是GetQueueEvent函数,这个函数的作用相当于IOCP里的GetCompleteStatus,用于从队列
    中获取事件,不同的是,这个队列是由用户提供的,也就是参数中的block_queue*,如果调用
    GetQueueEvent的时候,engine的buffering_event_queue中没有事件,就会将block_queue
    放到一个等待队列中(block_thread_queue),当其它地方调用PutQueueEvent的时候,首先
    会看下是否有线程在等待,有就把它的block_queue弹出push一个事件进去,然后唤醒阻塞
    的线程.

    #include "KendyNet.h"
    #include "Engine.h"
    #include "Socket.h"
    #include "link_list.h"
    #include "HandleMgr.h"
    #include <assert.h>
    #include "MsgQueue.h"
    
    int InitNetSystem()
    {
        return InitHandleMgr();
    }
    
    struct block_queue* CreateEventQ()
    {
        return BLOCK_QUEUE_CREATE();
    }
    
    void DestroyEventQ(struct block_queue** eq)
    {
        BLOCK_QUEUE_DESTROY(eq);
    }
    
    int EngineRun(HANDLE engine)
    {
        engine_t e = GetEngineByHandle(engine);
        if(!e)
            return -1;
        e->Loop(e);    
        return 0;
    }
    
    HANDLE CreateEngine(char mode)
    {
        HANDLE engine = NewEngine();
        if(engine >= 0)
        {
            engine_t e = GetEngineByHandle(engine);
            e->mode = mode;
            LIST_CLEAR(e->buffering_event_queue);
            LIST_CLEAR(e->block_thread_queue);
            if(0 != e->Init(e))
            {
                CloseEngine(engine);
                engine = -1;
            }
        }
        return engine;
    }
    
    void CloseEngine(HANDLE handle)
    {
        ReleaseEngine(handle);
    }
    
    int Bind2Engine(HANDLE e,HANDLE s)
    {
        engine_t engine = GetEngineByHandle(e);
        socket_t sock   = GetSocketByHandle(s);
        if(!engine || ! sock)
            return -1;
        if(engine->Register(engine,sock) == 0)
        {
            sock->engine = engine;
            return 0;
        }
        return -1;
    }
    
    //int    GetQueueEvent(HANDLE handle, struct block_queue *EventQ,st_io **io ,int timeout)
    int    GetQueueEvent(HANDLE handle, MsgQueue_t EventQ,st_io **io ,int timeout)
    {
        assert(EventQ);assert(io);
    /*    
        engine_t e = GetEngineByHandle(handle);
        if(!e)
            return -1;
        spin_lock(e->mtx,0);//mutex_lock(e->mtx);
        if(e->status == 0)
        {
            spin_unlock(e->mtx);//mutex_unlock(e->mtx);
            return -1;
        }
        
        if(*io = LIST_POP(st_io*,e->buffering_event_queue))
        {
            spin_unlock(e->mtx);//mutex_unlock(e->mtx);
            return 0;
        }
        //插入到等待队列中,阻塞
        LIST_PUSH_FRONT(e->block_thread_queue,EventQ);
        spin_unlock(e->mtx);//mutex_unlock(e->mtx);
    
        if(POP_FORCE_WAKE_UP == BLOCK_QUEUE_POP(EventQ,io,timeout))
            return -1;//因为engine的释放而被强制唤醒
        */
        engine_t e = GetEngineByHandle(handle);
        if(!e)
            return -1;
        mutex_lock(e->mtx);
        if(e->status == 0)
        {
            mutex_unlock(e->mtx);
            return -1;
        }
        
        if(*io = LIST_POP(st_io*,e->buffering_event_queue))
        {
            mutex_unlock(e->mtx);
            return 0;
        }
        //插入到等待队列中,阻塞
        LIST_PUSH_FRONT(e->block_thread_queue,EventQ);
        mutex_unlock(e->mtx);
    
        GetMsg(EventQ,io,sizeof(*io),0);
        if(*io == 0)
            return -1;
        
        return 0;    
    }
    
    int    PutQueueEvent(HANDLE handle,st_io *io)
    {
        assert(io);
        engine_t e = GetEngineByHandle(handle);
        if(!e)
            return -1;
        
        return put_event(e,io);
    }
    
    extern int put_event(engine_t e,st_io *io);
    
    int WSASend(HANDLE sock,st_io *io,int notify)
    {
        assert(io);
        socket_t s = GetSocketByHandle(sock);
        if(!s)
            return -1;
        
        int active_send_count = -1;
        int ret = 0;
        
        mutex_lock(s->send_mtx);
        //为保证执行顺序与请求的顺序一致,先将请求插入队列尾部,再弹出队列首元素
        LIST_PUSH_BACK(s->pending_send,io);
        io = 0;
        if(s->writeable)
        {
            io = LIST_POP(st_io*,s->pending_send);
            active_send_count = s->active_write_count;
        }
        mutex_unlock(s->send_mtx);
        
        if(s->engine->mode == MODE_POLL)
        {
            if(io)
            {
                //当前处于激活态
                if(notify)
                    //不直接处理,将处理交给完成线程 
                    put_event(s->engine,io);
                else
                    ret = _send(s,io,active_send_count,notify);
            }
        }
        else
        {        
            if(io)
                ret = _send(s,io,active_send_count,notify);
        }
        return ret;
    }
    
    int WSARecv(HANDLE sock,st_io *io,int notify)
    {
        assert(io);
        socket_t s = GetSocketByHandle(sock);
        if(!s)
            return -1;
    
        int active_recv_count = -1;
        int ret = 0;
        
        mutex_lock(s->recv_mtx);
        //为保证执行顺序与请求的顺序一致,先将请求插入队列尾部,再弹出队列首元素
        LIST_PUSH_BACK(s->pending_recv,io);
        io  = 0;
        if(s->readable)
        {
            io = LIST_POP(st_io*,s->pending_recv);
            active_recv_count = s->active_read_count;
        }
        mutex_unlock(s->recv_mtx);
        
        if(s->engine->mode == MODE_POLL)
        {
            if(io)
            {
                //当前处于激活态
                if(notify)
                    //不直接处理,将处理交给完成线程 
                    put_event(s->engine,io);
                else
                    ret =  _recv(s,io,active_recv_count,notify);
            }
        }
        else
        {    
            if(io)
                ret = _recv(s,io,active_recv_count,notify);
        }
        return ret;
    }

    然后是engine接口:

    #ifndef _ENGINE_H
    #define _ENGINE_H
    
    #include "sync.h"
    //#include "thread.h"
    #include "link_list.h"
    
    struct socket_wrapper;
    typedef struct engine
    {
        int  (*Init)(struct engine*);
        void (*Loop)(struct engine*);
        int  (*Register)(struct engine*,struct socket_wrapper*);
        int  (*UnRegister)(struct engine*,struct socket_wrapper*);
        
        mutex_t mtx;
        volatile int status; /*0:关闭状态,1:开启状态*/
        int poller_fd;
        
        //完成事件相关成员
        struct link_list   *buffering_event_queue;    //当没有线程等待时,事件将被缓存到这个队列中
        struct link_list   *block_thread_queue;    //正在等待完成消息的线程
        
        //thread_t     engine_thread;         //运行engine的线程
        
    }*engine_t;
    
    engine_t create_engine();
    void   free_engine(engine_t *);
    void   stop_engine(engine_t);
    
    
    #endif

    engine接口提供了4个函数指针,可根据不同的系统实现相应的函数,在这里使用的是linux的epoll.

    #include "Engine.h"
    #include "link_list.h"
    #include "KendyNet.h"
    #include <stdlib.h>
    #include "epoll.h"
    #include <assert.h>
    
    engine_t create_engine()
    {
        engine_t e = malloc(sizeof(*e));
        if(e)
        {
            e->mtx = mutex_create();
            e->status = 0;
            e->buffering_event_queue = LIST_CREATE();
            e->block_thread_queue = LIST_CREATE();
            e->Init = epoll_init;
            e->Loop = epoll_loop;
            e->Register = epoll_register;
            e->UnRegister = epoll_unregister;
            //e->engine_thread = 0;
        }
        return e;
    }
    
    void   free_engine(engine_t *e)
    {
        assert(e);
        assert(*e);
        mutex_destroy(&(*e)->mtx);
        LIST_DESTROY(&(*e)->buffering_event_queue);
        LIST_DESTROY(&(*e)->block_thread_queue);
        //if((*e)->engine_thread)
        //    destroy_thread(&(*e)->engine_thread);
        free(*e);
        *e = 0;
    }
    
    int put_event(engine_t e,st_io *io)
    {
        mutex_lock(e->mtx);
        struct block_queue *EventQ = LIST_POP(struct block_queue*,e->block_thread_queue);
        if(!EventQ)
        {
            //没有等待的线程,先缓冲事件
            LIST_PUSH_BACK(e->buffering_event_queue,io);
            io = 0;
        }
        mutex_unlock(e->mtx);    
        if(io)
            BLOCK_QUEUE_PUSH(EventQ,io);
        return 0;
    }
    
    void   stop_engine(engine_t e)
    {
        mutex_lock(e->mtx);
        e->status = 0;
        //join(e->engine_thread);
        //强制唤醒所有等待在完成队列上的线程 
        struct block_queue *queue = 0;
        /*唤醒所有等待在完成队列的线程*/
        while(queue = LIST_POP(struct block_queue *,e->block_thread_queue))
        {
            BLOCK_QUEUE_FORCE_WAKEUP(queue);
        }    
        mutex_unlock(e->mtx);
    }

    epoll的实现:

    #include "epoll.h"
    #include "Socket.h"
    #include "SocketWrapper.h"
    #include "HandleMgr.h"
    
    
    int  epoll_init(engine_t e);
    {
        e->poller_fd = TEMP_FAILURE_RETRY(epoll_create(MAX_SOCKET));
        return     e->poller_fd > 0 ? 0:-1; 
    }
    
    int epoll_register(engine_t e, socket_t s)
    {
        int ret = -1;    
        struct epoll_event ev;    
        ev.data.ptr = s;
        ev.events = EV_IN | EV_OUT | EV_ET;
        TEMP_FAILURE_RETRY(ret = epoll_ctl(e->poller_fd,EPOLL_CTL_ADD,s->fd,&ev));
        return ret;
    }
    
    
    inline int epoll_unregister(engine_t e,socket_t s)
    {
        struct epoll_event ev;int ret;
        TEMP_FAILURE_RETRY(ret = epoll_ctl(s->poller_fd,EPOLL_CTL_DEL,s->fd,&ev));
        return ret;
    }
    
    void epoll_loop(engine_t n)
    {
        struct epoll_event events[MAX_SOCKET];
        memset(events,0,sizeof(events));
        while(1)
        {
            int nfds = TEMP_FAILURE_RETRY(epoll_wait(n->poller_fd,events,MAX_SOCKET,-1));
            if(nfds < 0)
            {
                break;
            }
            int i;
            for(i = 0 ; i < nfds ; ++i)
            {    
                socket_t sock = (socket_t)events[i].data.ptr;
                if(sock)
                {
                    //套接口可读
                    if(events[i].events & EPOLLIN)
                    { 
                        on_read_active(sock);
                    }
    
                    //套接口可写
                    if(events[i].events & EPOLLOUT)
                    {
                        on_write_active(sock);
                    }        
                
                    if(events[i].events & EPOLLERR)
                    {
                        //套接口异常
                    }
                }
            }
        }
    }

    最后是socket_t

    #ifndef _SOCKETWRAPPER_H
    #define _SOCKETWRAPPER_H
    
    #include "Engine.h"
    
    typedef struct socket_wrapper
    {
        mutex_t  mtx;//保证ReleaseSocketWrapper只会被正常执行一次
        volatile int status;//0:未开启;1:正常;
        engine_t  *engine;
            
        volatile int readable;
        volatile int writeable;
        volatile int active_read_count;
        volatile int active_write_count;
        int fd;
        
        //当发送/接收无法立即完成时,把请求投入下面两个队列中,
        //当套接口可读/可写时再完成请求
        struct link_list *pending_send;//尚未处理的发请求
        struct link_list *pending_recv;//尚未处理的读请求
        
        mutex_t   recv_mtx;
        mutex_t   send_mtx;
        
        
    }*socket_t;
    
    
    void on_read_active(socket_t);
    void on_write_active(socket_t);
    socket_t create_socket();
    void free_socket(socket_t*);
    
    int _recv(socket_t,st_io*,int,int notify);
    int _send(socket_t,st_io*,int,int notify);
    
    
    #endif

    socket_t.c

    #include "Socket.h"
    #include <assert.h>
    #include <stdlib.h>
    #include <errno.h>
    #include "SocketWrapper.h"
    #include "KendyNet.h"
    
    socket_t create_socket()
    {
        socket_t s = malloc(sizeof(*s));
        if(s)
        {
            s->mtx = mutex_create();
            s->recv_mtx = mutex_create();
            s->send_mtx = mutex_create();
            s->pending_send = LIST_CREATE();
            s->pending_recv = LIST_CREATE();
            s->status = 0;
            s->engine = 0;
        }
        return s;
    }
    
    void free_socket(socket_t *s)
    {
        assert(s);assert(*s);
        mutex_destroy(&(*s)->mtx);
        mutex_destroy(&(*s)->recv_mtx);
        mutex_destroy(&(*s)->send_mtx);
        destroy_list(&(*s)->pending_send);
        destroy_list(&(*s)->pending_recv);
        free(*s);
        *s = 0;
    }
    
    void on_read_active(socket_t s)
    {
        assert(s);
        mutex_lock(s->recv_mtx);
        s->readable = 1;
        int active_recv_count = ++s->active_read_count;
        st_io *req = LIST_POP(st_io*,s->pending_recv);
        mutex_unlock(s->recv_mtx);
        
        if(req)
        {
            if(s->engine->mode == MODE_COMPLETE)
                _recv(s,req,active_recv_count,1);
            else
                put_event(s->engine,req);
        }
    }
    
    void on_write_active(socket_t s)
    {
        assert(s);
        mutex_lock(s->send_mtx);
        s->writeable = 1;
        int active_send_count = ++s->active_write_count;
        st_io *req = LIST_POP(st_io*,s->pending_send);
        mutex_unlock(s->send_mtx);
        if(req)
        {
            if(s->engine->mode == MODE_COMPLETE)
                _send(s,req,active_send_count,1);
            else
                put_event(s->engine,req);
        }        
    }
    
    int _recv(socket_t s,st_io* io_req,int active_recv_count,int notify)
    {
        assert(s);assert(io_req);
        while(1)
        {
            int retry = 0;
            int bytes_transfer = io_req->bytes_transfer = TEMP_FAILURE_RETRY(readv(s->fd,io_req->iovec,io_req->iovec_count));
            io_req->error_code = 0;
            if(bytes_transfer < 0)
            {
                switch(errno)
                {
                    case EAGAIN:
                    {
                        mutex_lock(s->recv_mtx);
                        if(active_recv_count != s->active_read_count)
                        {
                            active_recv_count = s->active_read_count;
                            retry = 1;
                        }
                        else
                        {
                            s->readable = 0;
                            LIST_PUSH_FRONT(s->pending_recv,io_req);
                        }
                        mutex_unlock(s->recv_mtx);
                        
                        if(retry)
                            continue;
                        return 0;
                    }
                    break;
                    default://连接出错
                    {
                        io_req->error_code = errno;
                    }
                    break;
                }
            }
            else if(bytes_transfer == 0)
                bytes_transfer = -1;
            if(notify)
                put_event(s->engine,io_req);    
            return bytes_transfer;    
        }    
    }
    
    int _send(socket_t s,st_io* io_req,int active_send_count,int notify)
    {
        assert(s);assert(io_req);
        while(1)
        {
            int retry = 0;
            int bytes_transfer = io_req->bytes_transfer = TEMP_FAILURE_RETRY(writev(s->fd,io_req->iovec,io_req->iovec_count));
            io_req->error_code = 0;    
            if(bytes_transfer < 0)
            {
                switch(errno)
                {
                    case EAGAIN:
                    {
                        mutex_lock(s->send_mtx);
                        if(active_send_count != s->active_write_count)
                        {
                            active_send_count = s->active_write_count;
                            retry = 1;
                        }
                        else
                        {
                            s->writeable = 0;
                            //将请求重新放回到队列
                            LIST_PUSH_FRONT(s->pending_send,io_req);
                        }
                        mutex_unlock(s->send_mtx);
                        
                        if(retry)
                            continue;
                        return 0;
                    }
                    break;
                    default://连接出错
                    {
                        io_req->error_code = errno;
                    }
                    break;
                }
            }
            else if(bytes_transfer == 0)
                bytes_transfer = -1;
            if(notify)
                put_event(s->engine,io_req);    
            return bytes_transfer;    
        }
    }

    用户提供的完成例程代码大概如下:

    st_io *io = 0;
    block_queue *EventQ;
    while(GetQueueEvent(engine,EventQ,&io ,0) == 0)
    {
        if(io)
        {
            HANDLE sock = GetHandle(io);
            OnRecv(io);
            int ret;
            //发起新的读,直到套接字变为不可读
            while((ret = WSARecv(sock,io,0)) > 0)
            {
                OnRecv(io);
            }
            
            if(ret < 0)
            {
                //连接断开
                CloseSocket(sock);
            }
        }
    }

    对于快速的发送方,完成例程接收完并处理之后,套接口上可能马上又有数据可读了,这样在epoll线程中执行recv的机会比较小。
    但如果发送方全都是慢速的,例如每一秒发送一次,则会导致几乎所有的recv操作都在epoll线程中执行,当连接数巨大时没法发挥

    多核处理器的优势,所以就有了模型2,epoll线程完全不处理recv,当套接字变为激活时,将请求投递到队列中,交给complete例程

    去执行recv.

    (2012.4.16修改,增加对模型2的支持,CreateEngine时可选择传入参数MODE_COMPLETE或MODE_POLL,

    当传入MODE_POLL时,GetQueueEvent返回的事件表明套接口可读/写,且有读/写请求,传入MODE_COMPLETE

    GetQueueEvent返回的事件表明一个读/写请求已经完成)

    测试程序如下,注意IORoutinePoll和IORoutine的区别:

    test.c

    #include <stdio.h>
    #include <stdlib.h>
    #include "KendyNet.h"
    #include "thread.h"
    #include "SocketWrapper.h"
    #include "atomic.h"
    enum
    {
        RECV_FINISH = 0,
        SEND_FINISH,
    };
    
    typedef struct IoContext
    {
        st_io m_ioStruct;
        unsigned char    m_opType;
        void             *ud;
    }IoContext;
    
    typedef struct _Socket
    {
        char send_buf[64];
        char recv_buf[64];
        struct iovec recv_iovec;
        struct iovec send_iovec;
        IoContext m_IORecvComp;
        IoContext m_IOSendComp;
        HANDLE    m_sock;
    }_Socket;
    
    HANDLE engine;
    const char *ip;
    long port;
    
    void *ListerRoutine(void *arg)
    {
        thread_t thread = (thread_t)CUSTOM_ARG(arg);
        struct sockaddr_in servaddr;
        HANDLE listerfd;
        if((listerfd = Tcp_Listen(ip,port,&servaddr,5)) == 0)
        {
            while(!is_terminate(thread))
            {
                struct sockaddr sa;
                socklen_t salen;
                HANDLE sock = Accept(listerfd,&sa,&salen);
                if(sock >= 0)
                {
                    setNonblock(sock);
                    _Socket *_sock = malloc(sizeof(*_sock));
                    _sock->m_sock = sock;
                    _sock->m_IORecvComp.m_opType = RECV_FINISH;
                    _sock->m_IOSendComp.m_opType = SEND_FINISH;
                    _sock->m_IORecvComp.ud = _sock->m_IOSendComp.ud = _sock;
                    _sock->recv_iovec.iov_base = _sock->recv_buf;
                    _sock->send_iovec.iov_base = _sock->send_buf;
                    _sock->m_IORecvComp.m_ioStruct.iovec = &_sock->recv_iovec;
                    _sock->m_IOSendComp.m_ioStruct.iovec = &_sock->send_iovec;
                    _sock->m_IOSendComp.m_ioStruct.iovec_count = _sock->m_IORecvComp.m_ioStruct.iovec_count = 1;
                    //printf("接到一个连接\n");
                    if(0 != Bind2Engine(engine,sock))
                    {
                        printf("bind出错\n");
                        CloseSocket(sock);
                        free(_sock);
                    }
                    else
                    {
                        //发起第一个读请求
                        _sock->m_IORecvComp.m_ioStruct.iovec->iov_len = 64;
                        WSARecv(sock,(st_io*)&(_sock->m_IORecvComp),1);
                    }
                }
                else
                {
                    printf("accept出错\n");
                }
            }
            printf("listener 终止\n");
        }
        return 0;
    }
    
    //COMPLETE MODE
    void *IORoutine(void *arg)
    {
        st_io* ioComp;
        //struct block_queue *EventQ = CreateEventQ();
        MsgQueue_t EventQ = CreateMsgQ();
        while(1)
        {
            ioComp = 0;
            if(0 > GetQueueEvent(engine,EventQ,&ioComp,-1))
            {
                printf("poller终止\n");
                break;
            }
            //printf("唤醒咯\n");
            if(ioComp)
            {
                if(ioComp->bytes_transfer <= 0)
                {
                    //printf("套接口断开\n");
                    _Socket *sock = (_Socket*)(((IoContext*)ioComp)->ud);
                    //连接关闭
                    CloseSocket(sock->m_sock);
                    free(sock);
                }
                else
                {
                    IoContext *context = (IoContext*)ioComp;
                    _Socket *sock = (_Socket*)context->ud;
                    if(context->m_opType == RECV_FINISH)
                    {
                        int byteTransfer = 0;
                        //把数据回发给客户端
                        memcpy(sock->send_buf,sock->recv_buf,ioComp->bytes_transfer);
                        sock->m_IOSendComp.m_ioStruct.iovec->iov_len = ioComp->bytes_transfer;
                        WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0);
                        
                        //继续读
                        while((byteTransfer = WSARecv(sock->m_sock,(st_io*)context,0)) > 0)
                        {
                            memcpy(sock->send_buf,sock->recv_buf,byteTransfer);
                            sock->m_IOSendComp.m_ioStruct.iovec->iov_len = byteTransfer;
                            byteTransfer = WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0);
                            if(byteTransfer == 0)
                                printf("can't write\n");
                            if(byteTransfer < 0)
                            {
                                printf("close\n");
                                break;
                            }
                        }
    
                        if(byteTransfer < 0)
                        {
                            //printf("套接口断开\n");
                            CloseSocket(sock->m_sock);
                            free(sock);
                        }
                    }
                }
            }
        }
        printf("IO end\n");
        //DestroyEventQ(&EventQ);
        DestroyMsgQ(&EventQ);
        return 0;
    }
    //POLL MODE
    void *IORoutinePoll(void *arg)
    {
        st_io* ioComp;
        MsgQueue_t EventQ = CreateMsgQ();
        while(1)
        {
            ioComp = 0;
            if(0 > GetQueueEvent(engine,EventQ,&ioComp,-1))
            {
                printf("poller终止\n");
                break;
            }
            if(ioComp)
            {
    
                IoContext *context = (IoContext*)ioComp;
                _Socket *sock = (_Socket*)context->ud;
                if(context->m_opType == RECV_FINISH)
                {
                    int byteTransfer = 0;
                    while((byteTransfer = WSARecv(sock->m_sock,(st_io*)context,0)) > 0)
                    {
                        memcpy(sock->send_buf,sock->recv_buf,byteTransfer);
                        sock->m_IOSendComp.m_ioStruct.iovec->iov_len = byteTransfer;
                        byteTransfer = WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0);
                        if(byteTransfer == 0)
                            printf("can't write1\n");
                        if(byteTransfer < 0)
                        {
                            printf("close\n");
                            break;
                        }
                    }
    
                    if(byteTransfer < 0)
                    {
                        //printf("connection close\n");
                        CloseSocket(sock->m_sock);
                        free(sock);
                    }
                }
                else
                    WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0);
    
            }
        }
        printf("IO end\n");
        DestroyMsgQ(&EventQ);
        return 0;
    }
    void *EngineRoutine(void *arg)
    {
        EngineRun(engine);
        printf("Engine stop\n");
        return 0;
    }
    
    int main(int argc,char **argv)
    {
        ip = "127.0.0.1";//argv[];
        port = atoi(argv[1]);
        signal(SIGPIPE,SIG_IGN);
        if(InitNetSystem() != 0)
        {
            printf("Init error\n");
            return 0;
        }
    
        //engine = CreateEngine(MODE_POLL);
        engine = CreateEngine(MODE_COMPLETE);
        thread_t listener = create_thread(0);
        start_run(listener,ListerRoutine,listener);
        
        int complete_count = atoi(argv[2]);
        int i = 0;
        for( ; i < complete_count; ++i)
        {
            thread_t complete_t = create_thread(0);
            
            //start_run(complete_t,IORoutinePoll,0);
            start_run(complete_t,IORoutine,0);
        }
        thread_t engine_thread = create_thread(1);
        start_run(engine_thread,EngineRoutine,0);
        getchar();
        CloseEngine(engine);
        join(engine_thread);
        return 0;
    }



  • 相关阅读:
    用Doxygen生成X3D的继承关系树
    FreeBSD 8.0候选版本RC2发布
    Mozilla Firefox, Apple Safari,Chrome等主流浏览器均开始WebGL支持
    关于企业管理信息系统
    [转]WebGL标准最新进展
    C++ + Irrlicht整一个东东?
    FreeWRL Windows Beta版本注记
    选择C++开发环境
    老人与老浏览器-李开复与成熟度最高的VRML浏览器SGI Cosmo
    WebGL概念及HTML5推广给X3D规范带来的新出路
  • 原文地址:https://www.cnblogs.com/sniperHW/p/2438850.html
Copyright © 2011-2022 走看看