zoukankan      html  css  js  c++  java
  • Linux高性能server编程——I/O复用

    

    IO复用

    I/O复用使得程序能同一时候监听多个文件描写叙述符。通常网络程序在下列情况下须要使用I/O复用技术:

    1. client程序要同一时候处理多个socket

    2. client程序要同一时候处理用户输入和网络连接

    3. TCPserver要同一时候处理监听socket和连接socket,这是I/O复用使用最多的场合

    4. server要同一时候处理TCP请求和UDP请求。比方本章将要讨论的会社server

    5. server要同一时候监听多个port。或者处理多种服务。

    I/O复用尽管能同一时候监听多个文件描写叙述符,但它本身是堵塞的。而且当多个文件描写叙述符同一时候就绪时,假设不採用额外措施,程序就仅仅能按顺序依次处理当中的每个文件描写叙述符,这使得server程序看起来像是串行工作。

    假设要实现并发,仅仅能使用多进程或多线程等变成手段。

    select系统复用

    select系统调用的用途是:在一段指定时间内。监听用户感兴趣的文件描写叙述符上的可读可写和异常等事件。

    #include <sys/select.h>

    int select(int nfds, fd_set *readfds,fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

    1. nfds參数指定被监听的文件描写叙述符的总数。

      通常被设置为select监听的全部文件描写叙述符中的最大值加1,由于文件描写叙述符是从0開始计数的

    2. readfds, writefdsexceptfds參数分别指向可读、可写和异常等事件相应的文件描写叙述符集合。

      fd_set结构体仅包括一个整形数组。高数组的每一个元素的每一位标记一个文件描写叙述符。

      可用例如以下宏来訪问fd_set结构体中的位:

      voidFD_CLR(int fd, fd_set *set);

      int  FD_ISSET(int fd, fd_set *set);

      voidFD_SET(int fd, fd_set *set);

          void FD_ZERO(fd_set*set);

    3. timeout參数用来设置select函数的超时时间。它是一个timeval指针。timeval结构体定义例如以下:

    struct timeval {

                  long   tv_sec;        /* seconds */

                  long   tv_usec;       /* microseconds */

              };

    假设给timeout传递NULL。则select将一直堵塞,直到某个文件描写叙述符就绪。

    select成功时返回就绪文件描写叙述符的总数,假设在超时时间内没有不论什么文件描写叙述符就绪返回0,失败返回-1,并设置errno;假设select在等待期间收到信号,则select马上返回-1,并设置errnoEINTR

    poll系统调用

    poll系统调用和select类似,也是在指定时间内伦旭一定数量的文件描写叙述符。以測试当中是否有就绪。

    poll原型例如以下:

    #include<poll.h>

    int poll(structpollfd *fds, nfds_t nfds, int timeout);

    1)fds參数是一个pollfd结构类型的数组,它指定所以我们感兴趣的文件描写叙述符上发生的刻度、可写和异常等时间。其结构定义例如以下:

    struct pollfd {

                  int  fd;        /* file descriptor */

                  short events;    /* requested events */

                  short revents;   /* returned events */

              };

    当中fd成员指定文件描写叙述符;events成员告诉poll监听f上的那些时间,它是一系列时间的按位或;revents成员则由内核改动,以通知应用程序fd上实际发生了哪些事件。

    2)nfds參数指定被监听事件集合的大小。其类型nfds_t定义例如以下:

    typedef unsignedlong int nfds_t;

    1. timeout參数指定poll的超时时间。单位是毫秒。当timeout-1时。poll调用将永远堵塞,直到某个事件发生;当为0时,poll调用马上返回。

      poll返回值含义与select同样。

    epoll系列系统调用

    内核事件表

    epollLinux特有的I/O复用函数。它在实现和使用上与selectpoll有非常大差异。首先,epoll使用一组函数来完毕任务。而不是单个函数。

    其次,epoll把用户关心的文件描写叙述符上的时间放在内核里的一个时间表中。从而无需向selectpoll那样每次调用都要反复传入文件描写叙述符集或事件集。但epoll须要使用一个额外的文件描写叙述符,来唯一标识内核中的这个时间表。这个文件描写叙述符使用例如以下epoll_create函数创建:

    #include <sys/epoll.h>

    int epoll_create(int size);

    size參数给内核一个提示,告诉它时间表须要多大。该函数返回的文件描写叙述符将作用其它全部epoll系统调用的第一个參数,以指定要訪问的内核事件表。

    以下的函数用来操作epoll的内核事件表:

    #include <sys/epoll.h>

    int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);

    fd參数是要操作的文件描写叙述符。op參数则制定操作类型,操作类型有例如以下3种:

    EPOLL_CTL_ADD:往事件表中注冊fd上的事件

    EPOLL_CTL_MOD:改动fd上的注冊事件

    EPOLL_CTL_DEL:删除fd上的注冊事件

    event參数指定时间,它是epoll_event结构指针类型。epoll_event的定义例如以下:

    struct epoll_event {

                  uint32_t    events;     /* Epoll events */

                  epoll_data_t data;       /* User data variable */

              };

    当中events成员描写叙述事件类型。data成员用于存储用户数据。其类型epoll_data的定义例如以下:

    typedef union epoll_data {

                  void       *ptr;

                  int         fd;

                  uint32_t    u32;

                  uint64_t    u64;

              } epoll_data_t;

    epoll_data_t是一个联合体,当中4个成员中使用最多的是fd,它指定事件所丛书的目标文件描写叙述符。

    epoll_ctl成功时返回0,失败时返回-1并设置errno

    epoll_wait函数

    epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描写叙述符上的事件,其原型例如以下:

    #include <sys/epoll.h>

    int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);

    该函数成功时返回就绪的文件描写叙述符的个数,失败是返回-1,并设置errno

    maxevents參数指定最多监听多少时间,必须大于0.

    epoll_wait函数假设检測到事件。就将全部就绪的事件从内核事件表中拷贝到它的第二个參数events指向的数组中。这个数组仅仅用于输出epoll_wait检測到的就绪时间,而不像selectpoll数组那样即用于传入用户注冊的时间,实用于输出内核检測到的就绪时间。这就极大的提高了应用程序索引就绪文件描写叙述符的效率。

    以下的代码体现了这个区别:

    /*怎样索引poll返回的就绪文件描写叙述符*/

    int ret = poll(fds, MAX_EVENT_NUMBER, -1);

    /*必须遍历全部注冊文件描写叙述符并找到当中的就绪着*/

    for(int i=0;i<MAX_EVENT_NUMBER; ++i)

    {

            if(fds[i].revents & POLLIN)

    {

            int sockfd = fds[i].fd;

            /*处理sockfd*/

    }

    }

     

    /*怎样索引epoll返回的就绪文件描写叙述符*/

    int ret =epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1);

    /*遍历就绪的ret个文件描写叙述符*/

    for( int i=0;i<ret; i++)

    {

            int socketfd = events[i].data.fd;

            /*socket肯定就绪。直接处理*/

    }

     

    LTET模式

    epoll对文件描写叙述符的操作有两种模式:LT模式(Levek Trigger,电平触发)和ET模式(E多个Trigger,边沿触发)。

    LT模式是默认的工作模式,这样的模式下epoll相当于一个效率较高的poll

    当往epoll内核事件表中注冊一个文件描写叙述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描写叙述符。ET模式是epoll的搞笑工作模式。

    对于採用LT工作模式的文件描写叙述符。当epoll_wait检測到其上有时间发生并将此事件通知应用程序后,应用程序能够不马上处理该事件。这样。当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。而对于採用ET工作模式的文件描写叙述符,当epoll_wait检測到其上有时间发生并将此时间通知应用程序后。应用程序必须马上处理该事件,由于兴许的epoll_wait调用将不再向用用程序通知这一事件。可见。ET在非常大程度上减少了同一个epoll事件被反复触发的次数,因此效率比LT模式高。

    文章最后的程序清单1比較了两种模式:

    当在clienttelnet传输“abcdefghijklmnopqrstuvwxyz”字符串时。输出例如以下

    ET模式输出:

    event trigger once

    get 9 bytes of content: abcdefghi

    get 9 bytes of content: jklmnopqr

    get 9 bytes of content: stuvwxyz

    get 1 bytes of content:

    LT模式输出:

    event trigger once

    get 9 bytes of content: abcdefghi

    event trigger once

    get 9 bytes of content: jklmnopqr

    event trigger once

    get 9 bytes of content: stuvwxyz

    event trigger once

    get 1 bytes of content:

    能够看到正如我们预期,ET模式下时间仅仅被触发一次,要比LT模式下少非常多。

    EPOLLONESHOT事件

    即使我们使用ET模式。一个socket上的某个事件还是可能被触发多次。这在并发程序中会引起一个问题。

    比方一个县城在读取完某个socket上的数据后開始处理这些数据,二在数据的处理project中该socket上又有新数据可读。此时另外一个县城北唤醒来读取这些新的数据。

    于是就出现了两个线程同一时候操作一个socket的局面。这当然不是我们期望的。

    我们期望的是一个socket连接在任一时刻都仅仅被一个线程处理。这一点能够使用spollEPOLLONESHOT事件实现。

            对于注冊了EPOLLONESHOT事件的文件描写叙述符,操作系统最多触发其上注冊的一个可读、可写或者异常事件,并且仅仅触发一次。除非我们使用epoll_ctl函数重置该文件描写叙述符上注冊的EPOLLONESHOT事件ain.zheyang。当一个线程在处理某个socket时。其它线程是不可能有机会操作该socket的。

    但反过来思考,注冊了EPOLLONESHOT事件的socket一旦被某个线程处理完成,该线程就应该马上重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其它工作线程有机会处理这个socket

    程序清单2展示了EPOLLONESHOT事件的使用。

    三组I/O复用函数的比較

    系统调用

    select

    poll

    epoll

    事件集合

    用户通过3个參数分别传入感兴趣的可读、可写及异常等事件。内核通过对这些參数在线改动来反馈当中的就绪事件。这使得用户每次调用select都要重置这3个參数

    统一处理全部事件类型,因此仅仅须要一个事件集參数。用户通过pollfd.events传入感兴趣的事件,内核通过改动pollfd.revents反馈当中就绪的事件

    内核通过一个时间表直接管理用户感兴趣的全部事件。因此每次调用epoll_wait时,无需重复传入用户感兴趣的时间。

    epoll_wait系统调用的參数events仅用来反馈就绪的事件。

    应用程序索引就绪文件描写叙述符的时间复杂度

    O(N)

    O(N)

    O(1)

    最大支持文件描写叙述符数

    一般有最大值限制

    65535

    65535

    工作模式

    LT

    LT

    支持ET高效模式

    内核实现和工作效率

    採用轮询方法来检測就绪事件,算法复杂度为O(N)

    採用轮询方式检測就绪事件,算法复杂度为O(N)

    採用回调方式来检測就绪事件。算法复杂度为O(1)

     

    聊天程序见程序(poll实现)见清单3

    同一时候处理TCPUDP服务的回射server程序(epoll程序)见清单4



    程序清单1:
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 10
    
    int setnonblocking( int fd )
    {
        int old_option = fcntl( fd, F_GETFL );
        int new_option = old_option | O_NONBLOCK;
        fcntl( fd, F_SETFL, new_option );
        return old_option;
    }
    
    void addfd( int epollfd, int fd, bool enable_et )
    {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN;
        if( enable_et )
        {
            event.events |= EPOLLET;
        }
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
        setnonblocking( fd );
    }
    
    void lt( epoll_event* events, int number, int epollfd, int listenfd )
    {
        char buf[ BUFFER_SIZE ];
        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, false );
            }
            else if ( events[i].events & EPOLLIN )
            {
                printf( "event trigger once
    " );
                memset( buf, '', BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret <= 0 )
                {
                    close( sockfd );
                    continue;
                }
                printf( "get %d bytes of content: %s
    ", ret, buf );
            }
            else
            {
                printf( "something else happened 
    " );
            }
        }
    }
    
    void et( epoll_event* events, int number, int epollfd, int listenfd )
    {
        char buf[ BUFFER_SIZE ];
        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN )
            {
                printf( "event trigger once
    " );
                while( 1 )
                {
                    memset( buf, '', BUFFER_SIZE );
                    int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                    if( ret < 0 )
                    {
                        if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                        {
                            printf( "read later
    " );
                            break;
                        }
                        close( sockfd );
                        break;
                    }
                    else if( ret == 0 )
                    {
                        close( sockfd );
                    }
                    else
                    {
                        printf( "get %d bytes of content: %s
    ", ret, buf );
                    }
                }
            }
            else
            {
                printf( "something else happened 
    " );
            }
        }
    }
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        int ret = 0;
        struct sockaddr_in address;
        bzero( &address, sizeof( address ) );
        address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &address.sin_addr );
        address.sin_port = htons( port );
    
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( listenfd >= 0 );
    
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
        assert( ret != -1 );
    
        ret = listen( listenfd, 5 );
        assert( ret != -1 );
    
        epoll_event events[ MAX_EVENT_NUMBER ];
        int epollfd = epoll_create( 5 );
        assert( epollfd != -1 );
        addfd( epollfd, listenfd, true );
    
        while( 1 )
        {
            int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
            if ( ret < 0 )
            {
                printf( "epoll failure
    " );
                break;
            }
        
            lt( events, ret, epollfd, listenfd );
            //et( events, ret, epollfd, listenfd );
        }
    
        close( listenfd );
        return 0;
    }
    
    程序清单2
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 1024
    struct fds
    {
       int epollfd;
       int sockfd;
    };
    
    int setnonblocking( int fd )
    {
        int old_option = fcntl( fd, F_GETFL );
        int new_option = old_option | O_NONBLOCK;
        fcntl( fd, F_SETFL, new_option );
        return old_option;
    }
    
    void addfd( int epollfd, int fd, bool oneshot )
    {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET;
        if( oneshot )
        {
            event.events |= EPOLLONESHOT;
        }
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
        setnonblocking( fd );
    }
    
    void reset_oneshot( int epollfd, int fd )
    {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
    }
    
    void* worker( void* arg )
    {
        int sockfd = ( (fds*)arg )->sockfd;
        int epollfd = ( (fds*)arg )->epollfd;
        printf( "start new thread to receive data on fd: %d
    ", sockfd );
        char buf[ BUFFER_SIZE ];
        memset( buf, '', BUFFER_SIZE );
        while( 1 )
        {
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret == 0 )
            {
                close( sockfd );
                printf( "foreiner closed the connection
    " );
                break;
            }
            else if( ret < 0 )
            {
                if( errno == EAGAIN )
                {
                    reset_oneshot( epollfd, sockfd );
                    printf( "read later
    " );
                    break;
                }
            }
            else
            {
                printf( "get content: %s
    ", buf );
                sleep( 5 );
            }
        }
        printf( "end thread receiving data on fd: %d
    ", sockfd );
    }
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        int ret = 0;
        struct sockaddr_in address;
        bzero( &address, sizeof( address ) );
        address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &address.sin_addr );
        address.sin_port = htons( port );
    
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( listenfd >= 0 );
    
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
        assert( ret != -1 );
    
        ret = listen( listenfd, 5 );
        assert( ret != -1 );
    
        epoll_event events[ MAX_EVENT_NUMBER ];
        int epollfd = epoll_create( 5 );
        assert( epollfd != -1 );
        addfd( epollfd, listenfd, false );
    
        while( 1 )
        {
            int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
            if ( ret < 0 )
            {
                printf( "epoll failure
    " );
                break;
            }
        
            for ( int i = 0; i < ret; i++ )
            {
                int sockfd = events[i].data.fd;
                if ( sockfd == listenfd )
                {
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof( client_address );
                    int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                    addfd( epollfd, connfd, true );
                }
                else if ( events[i].events & EPOLLIN )
                {
                    pthread_t thread;
                    fds fds_for_new_worker;
                    fds_for_new_worker.epollfd = epollfd;
                    fds_for_new_worker.sockfd = sockfd;
                    pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
                }
                else
                {
                    printf( "something else happened 
    " );
                }
            }
        }
    
        close( listenfd );
        return 0;
    }
    

    程序清单3
    客户端程序
    #define _GNU_SOURCE 1
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <poll.h>
    #include <fcntl.h>
    
    #define BUFFER_SIZE 64
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        struct sockaddr_in server_address;
        bzero( &server_address, sizeof( server_address ) );
        server_address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &server_address.sin_addr );
        server_address.sin_port = htons( port );
    
        int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( sockfd >= 0 );
        if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
        {
            printf( "connection failed
    " );
            close( sockfd );
            return 1;
        }
    
        pollfd fds[2];
        fds[0].fd = 0;
        fds[0].events = POLLIN;
        fds[0].revents = 0;
        fds[1].fd = sockfd;
        fds[1].events = POLLIN | POLLRDHUP;
        fds[1].revents = 0;
        char read_buf[BUFFER_SIZE];
        int pipefd[2];
        int ret = pipe( pipefd );
        assert( ret != -1 );
    
        while( 1 )
        {
            ret = poll( fds, 2, -1 );
            if( ret < 0 )
            {
                printf( "poll failure
    " );
                break;
            }
    
            if( fds[1].revents & POLLRDHUP )
            {
                printf( "server close the connection
    " );
                break;
            }
            else if( fds[1].revents & POLLIN )
            {
                memset( read_buf, '', BUFFER_SIZE );
                recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
                printf( "%s
    ", read_buf );
            }
    
            if( fds[0].revents & POLLIN )
            {
                ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
                ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
            }
        }
        
        close( sockfd );
        return 0;
    }
    
    服务器程序
    #define _GNU_SOURCE 1
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <poll.h>
    
    #define USER_LIMIT 5
    #define BUFFER_SIZE 64
    #define FD_LIMIT 65535
    
    struct client_data
    {
        sockaddr_in address;
        char* write_buf;
        char buf[ BUFFER_SIZE ];
    };
    
    int setnonblocking( int fd )
    {
        int old_option = fcntl( fd, F_GETFL );
        int new_option = old_option | O_NONBLOCK;
        fcntl( fd, F_SETFL, new_option );
        return old_option;
    }
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        int ret = 0;
        struct sockaddr_in address;
        bzero( &address, sizeof( address ) );
        address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &address.sin_addr );
        address.sin_port = htons( port );
    
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( listenfd >= 0 );
    
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
        assert( ret != -1 );
    
        ret = listen( listenfd, 5 );
        assert( ret != -1 );
    
        client_data* users = new client_data[FD_LIMIT];
        pollfd fds[USER_LIMIT+1];
        int user_counter = 0;
        for( int i = 1; i <= USER_LIMIT; ++i )
        {
            fds[i].fd = -1;
            fds[i].events = 0;
        }
        fds[0].fd = listenfd;
        fds[0].events = POLLIN | POLLERR;
        fds[0].revents = 0;
    
        while( 1 )
        {
            ret = poll( fds, user_counter+1, -1 );
            if ( ret < 0 )
            {
                printf( "poll failure
    " );
                break;
            }
        
            for( int i = 0; i < user_counter+1; ++i )
            {
                if( ( fds[i].fd == listenfd ) && ( fds[i].revents & POLLIN ) )
                {
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof( client_address );
                    int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                    if ( connfd < 0 )
                    {
                        printf( "errno is: %d
    ", errno );
                        continue;
                    }
                    if( user_counter >= USER_LIMIT )
                    {
                        const char* info = "too many users
    ";
                        printf( "%s", info );
                        send( connfd, info, strlen( info ), 0 );
                        close( connfd );
                        continue;
                    }
                    user_counter++;
                    users[connfd].address = client_address;
                    setnonblocking( connfd );
                    fds[user_counter].fd = connfd;
                    fds[user_counter].events = POLLIN | POLLRDHUP | POLLERR;
                    fds[user_counter].revents = 0;
                    printf( "comes a new user, now have %d users
    ", user_counter );
                }
                else if( fds[i].revents & POLLERR )
                {
                    printf( "get an error from %d
    ", fds[i].fd );
                    char errors[ 100 ];
                    memset( errors, '', 100 );
                    socklen_t length = sizeof( errors );
                    if( getsockopt( fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length ) < 0 )
                    {
                        printf( "get socket option failed
    " );
                    }
                    continue;
                }
                else if( fds[i].revents & POLLRDHUP )
                {
                    users[fds[i].fd] = users[fds[user_counter].fd];
                    close( fds[i].fd );
                    fds[i] = fds[user_counter];
                    i--;
                    user_counter--;
                    printf( "a client left
    " );
                }
                else if( fds[i].revents & POLLIN )
                {
                    int connfd = fds[i].fd;
                    memset( users[connfd].buf, '', BUFFER_SIZE );
                    ret = recv( connfd, users[connfd].buf, BUFFER_SIZE-1, 0 );
                    printf( "get %d bytes of client data %s from %d
    ", ret, users[connfd].buf, connfd );
                    if( ret < 0 )
                    {
                        if( errno != EAGAIN )
                        {
                            close( connfd );
                            users[fds[i].fd] = users[fds[user_counter].fd];
                            fds[i] = fds[user_counter];
                            i--;
                            user_counter--;
                        }
                    }
                    else if( ret == 0 )
                    {
                        printf( "code should not come to here
    " );
                    }
                    else
                    {
                        for( int j = 1; j <= user_counter; ++j )
                        {
                            if( fds[j].fd == connfd )
                            {
                                continue;
                            }
                            
                            fds[j].events |= ~POLLIN;
                            fds[j].events |= POLLOUT;
                            users[fds[j].fd].write_buf = users[connfd].buf;
                        }
                    }
                }
                else if( fds[i].revents & POLLOUT )
                {
                    int connfd = fds[i].fd;
                    if( ! users[connfd].write_buf )
                    {
                        continue;
                    }
                    ret = send( connfd, users[connfd].write_buf, strlen( users[connfd].write_buf ), 0 );
                    users[connfd].write_buf = NULL;
                    fds[i].events |= ~POLLOUT;
                    fds[i].events |= POLLIN;
                }
            }
        }
    
        delete [] users;
        close( listenfd );
        return 0;
    }
    
    程序清单4 回射服务器程序
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define TCP_BUFFER_SIZE 512
    #define UDP_BUFFER_SIZE 1024
    
    int setnonblocking( int fd )
    {
        int old_option = fcntl( fd, F_GETFL );
        int new_option = old_option | O_NONBLOCK;
        fcntl( fd, F_SETFL, new_option );
        return old_option;
    }
    
    void addfd( int epollfd, int fd )
    {
        epoll_event event;
        event.data.fd = fd;
        //event.events = EPOLLIN | EPOLLET;
        event.events = EPOLLIN;
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
        setnonblocking( fd );
    }
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        int ret = 0;
        struct sockaddr_in address;
        bzero( &address, sizeof( address ) );
        address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &address.sin_addr );
        address.sin_port = htons( port );
    
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( listenfd >= 0 );
    
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
        assert( ret != -1 );
    
        ret = listen( listenfd, 5 );
        assert( ret != -1 );
    
        bzero( &address, sizeof( address ) );
        address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &address.sin_addr );
        address.sin_port = htons( port );
        int udpfd = socket( PF_INET, SOCK_DGRAM, 0 );
        assert( udpfd >= 0 );
    
        ret = bind( udpfd, ( struct sockaddr* )&address, sizeof( address ) );
        assert( ret != -1 );
    
        epoll_event events[ MAX_EVENT_NUMBER ];
        int epollfd = epoll_create( 5 );
        assert( epollfd != -1 );
        addfd( epollfd, listenfd );
        addfd( epollfd, udpfd );
    
        while( 1 )
        {
            int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
            if ( number < 0 )
            {
                printf( "epoll failure
    " );
                break;
            }
        
            for ( int i = 0; i < number; i++ )
            {
                int sockfd = events[i].data.fd;
                if ( sockfd == listenfd )
                {
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof( client_address );
                    int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                    addfd( epollfd, connfd );
                }
                else if ( sockfd == udpfd )
                {
                    char buf[ UDP_BUFFER_SIZE ];
                    memset( buf, '', UDP_BUFFER_SIZE );
                    struct sockaddr_in client_address;
                    socklen_t client_addrlength = sizeof( client_address );
    
                    ret = recvfrom( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, &client_addrlength );
                    if( ret > 0 )
                    {
                        sendto( udpfd, buf, UDP_BUFFER_SIZE-1, 0, ( struct sockaddr* )&client_address, client_addrlength );
                    }
                }
                else if ( events[i].events & EPOLLIN )
                {
                    char buf[ TCP_BUFFER_SIZE ];
                    while( 1 )
                    {
                        memset( buf, '', TCP_BUFFER_SIZE );
                        ret = recv( sockfd, buf, TCP_BUFFER_SIZE-1, 0 );
                        if( ret < 0 )
                        {
                            if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                            {
                                break;
                            }
                            close( sockfd );
                            break;
                        }
                        else if( ret == 0 )
                        {
                            close( sockfd );
                        }
                        else
                        {
                            send( sockfd, buf, ret, 0 );
                        }
                    }
                }
                else
                {
                    printf( "something else happened 
    " );
                }
            }
        }
    
        close( listenfd );
        return 0;
    }
    



  • 相关阅读:
    poj 2485 Highways 最小生成树
    hdu 3415 Max Sum of MaxKsubsequence
    poj 3026 Borg Maze
    poj 2823 Sliding Window 单调队列
    poj 1258 AgriNet
    hdu 1045 Fire Net (二分图匹配)
    poj 1789 Truck History MST(最小生成树)
    fafu 1181 割点
    减肥瘦身健康秘方
    人生的问题
  • 原文地址:https://www.cnblogs.com/yangykaifa/p/6753396.html
Copyright © 2011-2022 走看看