zoukankan      html  css  js  c++  java
  • LINUX网络编程 IO 复用

    参考《linux高性能服务器编程》

    LINUX下处理多个连接时候,仅仅使用多线程和原始socket函数,效率十分低下

    于是就出现了selelct poll  epoll等IO复用函数。

    这里讨论性能最优的epoll IO复用

    用户将需要关注的socket连接使用IO复用函数放进一个事件表中,每当事件表中有一个或者多个SOCKET连接出现读写请求时候,则进行处理

    事件表使用一个额外的文件描述符来标识。文件描述符使用 epoll_create函数创建

    #inlclude <sys/epoll.h>

    int epoll_create(int size); size参数目前不起作用

    操作事件表使用epoll_ctl函数进行控制

    操作类型有以下3中 

    EPOLL_CTL_ADD  EPOLL_CTL_MOD  EPOLL_CTL_DEL

    在一段超时时间内等待事件表上的事件 使用epoll_wait();

    函数范例如下:

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 10
    
    int setnonblocking( int fd )
    {
        int old_option = fcntl( fd, F_GETFL );
        int new_option = old_option | O_NONBLOCK;
        fcntl( fd, F_SETFL, new_option );
        return old_option;
    }
    
    void addfd( int epollfd, int fd, bool enable_et )
    {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN;
        if( enable_et )
        {
            event.events |= EPOLLET;
        }
        epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
        setnonblocking( fd );
    }
    
    
    void et( epoll_event* events, int number, int epollfd, int listenfd )
    {
        char buf[ BUFFER_SIZE ];
        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN )
            {
                printf( "event trigger once
    " );
                while( 1 )
                {
                    memset( buf, '', BUFFER_SIZE );
                    int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                    if( ret < 0 )
                    {
                        if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                        {
                            printf( "read later
    " );
                            break;
                        }
                        close( sockfd );
                        break;
                    }
                    else if( ret == 0 )
                    {
                        close( sockfd );
                    }
                    else
                    {
                        printf( "get %d bytes of content: %s
    ", ret, buf );
                    }
                }
            }
            else
            {
                printf( "something else happened 
    " );
            }
        }
    }
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        int ret = 0;
        struct sockaddr_in address;
        bzero( &address, sizeof( address ) );
        address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &address.sin_addr );
        address.sin_port = htons( port );
    
        int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( listenfd >= 0 );
    
        ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
        assert( ret != -1 );
    
        ret = listen( listenfd, 5 );
        assert( ret != -1 );
    
        epoll_event events[ MAX_EVENT_NUMBER ];
        int epollfd = epoll_create( 5 );
        assert( epollfd != -1 );
        addfd( epollfd, listenfd, true );
    
        while( 1 )
        {
            int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
            if ( ret < 0 )
            {
                printf( "epoll failure
    " );
                break;
            }
        
            et( events, ret, epollfd, listenfd );
        }
    
        close( listenfd );
        return 0;
    }
    

      

    在尝试一个多线程代码;

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 10
    
    struct fds
    {
       int epollfd;
       int sockfd;
    };
    
    void* worker( void* arg )
    {
        int sockfd = ( (fds*)arg )->sockfd;
        int epollfd = ( (fds*)arg )->epollfd;
        printf( "start new thread to receive data on fd: %d
    ", sockfd );
        char buf[ BUFFER_SIZE ];
        memset( buf, '', BUFFER_SIZE );
        while( 1 )
        {
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret == 0 )
            {
                close( sockfd );
                printf( "foreiner closed the connection
    " );
                break;
            }
            else if( ret < 0 )
            {
                if( errno == EAGAIN )
                {
                   // reset_oneshot( epollfd, sockfd );
                    printf( "read later
    " );
                    break;
                }
            }
            else
            {
                printf( "get content: %s
    ", buf );
                sleep( 5 );
            }
        }
        printf( "end thread receiving data on fd: %d
    ", sockfd );
    }
    
    int setnonblocking(int fd)
    {
    	int old_option = fcntl(fd,F_GETFL);
    	int new_option = old_option | O_NONBLOCK;
    	fcntl(fd,F_SETFL,new_option);
    	return old_option;
    }
    
    void addfd(int epollfd,int fd,bool enable_et)
    {
    	epoll_event event;
    	event.data.fd = fd;
    	event.events = EPOLLIN;
    	
    	if(enable_et){
    		event.events |= EPOLLET;
    	}
    	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
    	setnonblocking(fd);
    }
    
    void et(epoll_event* events,int number,int epollfd,int listenfd){
    	char buf[BUFFER_SIZE];
    	for(int i = 0; i < number;i++){
    		int sockfd = events[i].data.fd;
    		if(sockfd == listenfd){
    			struct sockaddr_in client_address;
    			socklen_t client_addrlength = sizeof(client_address);
    			int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
    			addfd(epollfd,connfd,true);
    		}else if(events[i].events & EPOLLIN){
    			/*
    			printf("event trigger once
    ");
    			while(1){
    				memset(buf,'',BUFFER_SIZE);
    				int ret = recv(sockfd,buf,BUFFER_SIZE-1,0);
    				if(ret < 0){
    					if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
    						printf("read later
    ");
    						break;
    					}
    					close(sockfd);
    					break;
    				}else if(ret == 0){
    					close(sockfd);
    				}else{
    					printf("get %d bytes of content: %s 
    ",ret,buf);
    				}
    			}*/
    			 pthread_t thread;
                    fds fds_for_new_worker;
                    fds_for_new_worker.epollfd = epollfd;
                    fds_for_new_worker.sockfd = sockfd;
                    pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
    		}else{
    			printf("something else happened 
    ");
    		}
    	}
    }
    
    int main()
    {
    	int ret = 0;
    	struct sockaddr_in address;
    	bzero(&address,sizeof(address));
    	address.sin_family = AF_INET;
    	inet_pton(AF_INET,"127.0.0.1",&address.sin_addr);
    	address.sin_port = htons(1134);
    	
    	int listenfd = socket(AF_INET,SOCK_STREAM,0);
    	assert(listenfd >= 0);
    	
    	ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address));
    	assert(ret != -1);
    	
    	ret = listen(listenfd,5);
    	assert(ret != -1);
    	
    	epoll_event events[MAX_EVENT_NUMBER];
    	int epollfd = epoll_create(5);
    	assert(epollfd != -1);
    	addfd(epollfd,listenfd,true);
    	
    	while(1){
    		int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
    		if(ret < 0){
    			printf("epoll failure
    ");
    			break;
    		}
    		et(events,ret,epollfd,listenfd);
    	}
    	
    	close(listenfd);
    	return 0;
    }
    

      

    多线程版本中 如果我们 连续多次输入会出现一个问题

    如果我们连续多次在telnet的客户端输入

    服务端会开启多个线程 接受这个socket的输入

    显示如下:

    fd: 5, sockfdget content: dfdsf
    ...
    start new thread to receive data on fd: 5
    fd: 5, sockfdget content: sd
    ...
    start new thread to receive data on fd: 5
    fd: 5, sockfdget content: sd
    ...
    start new thread to receive data on fd: 5
    fd: 5, sockfdget content: sd
    ...
    start new thread to receive data on fd: 5
    fd: 5, sockfdget content: sd
    ...
    start new thread to receive data on fd: 5
    fd: 5, sockfdget content: sd
    ...
    start new thread to receive data on fd: 5
    fd: 5, sockfdget content: sd

    如果多个线程同时操作一个socket 可能出现各种意外情况

    那么我们就需要设置socket的EPOLLONESHOT标志

    对于连续操作,操作系统最多触发该socket上一次读写异常事件。

    代码如下:

    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <pthread.h>
    
    #define MAX_EVENT_NUMBER 1024
    #define BUFFER_SIZE 10
    
    struct fds
    {
       int epollfd;
       int sockfd;
    };
    
    void reset_oneshot( int epollfd, int fd )
    {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
    }
    
    
    void* worker( void* arg )
    {
        int sockfd = ( (fds*)arg )->sockfd;
        int epollfd = ( (fds*)arg )->epollfd;
        printf( "start new thread to receive data on fd: %d
    ", sockfd );
        char buf[ BUFFER_SIZE ];
        memset( buf, '', BUFFER_SIZE );
        while( 1 )
        {
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret == 0 )
            {
                close( sockfd );
                printf( "foreiner closed the connection
    " );
                break;
            }
            else if( ret < 0 )
            {
                if( errno == EAGAIN )
                {
                    reset_oneshot( epollfd, sockfd );
                    printf( "read later
    " );
                    break;
                }
            }
            else
            {
                printf( "get content: %s
    ", buf );
                sleep( 5 );
            }
        }
        printf( "end thread receiving data on fd: %d
    ", sockfd );
    }
    
    int setnonblocking(int fd)
    {
    	int old_option = fcntl(fd,F_GETFL);
    	int new_option = old_option | O_NONBLOCK;
    	fcntl(fd,F_SETFL,new_option);
    	return old_option;
    }
    
    void addfd(int epollfd,int fd,bool setOneShot)
    {
    	epoll_event event;
    	event.data.fd = fd;
    	event.events = EPOLLIN;
    	
    
    	event.events |= EPOLLET;
    	if(setOneShot)
    		event.events |= EPOLLONESHOT;
    	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
    	setnonblocking(fd);
    }
    
    
    
    void et(epoll_event* events,int number,int epollfd,int listenfd){
    	char buf[BUFFER_SIZE];
    	for(int i = 0; i < number;i++){
    		int sockfd = events[i].data.fd;
    		if(sockfd == listenfd){
    			struct sockaddr_in client_address;
    			socklen_t client_addrlength = sizeof(client_address);
    			int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
    			addfd(epollfd,connfd,true);
    		}else if(events[i].events & EPOLLIN){
    			/*
    			printf("event trigger once
    ");
    			while(1){
    				memset(buf,'',BUFFER_SIZE);
    				int ret = recv(sockfd,buf,BUFFER_SIZE-1,0);
    				if(ret < 0){
    					if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
    						printf("read later
    ");
    						break;
    					}
    					close(sockfd);
    					break;
    				}else if(ret == 0){
    					close(sockfd);
    				}else{
    					printf("get %d bytes of content: %s 
    ",ret,buf);
    				}
    			}*/
    			 pthread_t thread;
                    fds fds_for_new_worker;
                    fds_for_new_worker.epollfd = epollfd;
                    fds_for_new_worker.sockfd = sockfd;
                    pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
    		}else{
    			printf("something else happened 
    ");
    		}
    	}
    }
    
    int main()
    {
    	int ret = 0;
    	struct sockaddr_in address;
    	bzero(&address,sizeof(address));
    	address.sin_family = AF_INET;
    	inet_pton(AF_INET,"127.0.0.1",&address.sin_addr);
    	address.sin_port = htons(1134);
    	
    	int listenfd = socket(AF_INET,SOCK_STREAM,0);
    	assert(listenfd >= 0);
    	
    	ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address));
    	assert(ret != -1);
    	
    	ret = listen(listenfd,5);
    	assert(ret != -1);
    	
    	epoll_event events[MAX_EVENT_NUMBER];
    	int epollfd = epoll_create(5);
    	assert(epollfd != -1);
    	addfd(epollfd,listenfd,false);
    	
    	while(1){
    		int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
    		if(ret < 0){
    			printf("epoll failure
    ");
    			break;
    		}
    		et(events,ret,epollfd,listenfd);
    	}
    	
    	close(listenfd);
    	return 0;
    }
    

      

    输入

    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
    1
    2
    3
    4
    5
    sdfufghbskdjhkbaskjhbkjlsadvfblkajwsvdfbljkashdbkfjhasdfhasjkhD

    输出

    start new thread to receive data on fd: 5
    get content: 1

    get content: 2
    3
    4

    get content: 5
    sdfufg
    get content: hbskdjhkb
    get content: askjhbkjl
    get content: sadvfblka
    get content: jwsvdfblj
    get content: kashdbkfj
    get content: hasdfhasj
    get content: khD
    hasj
    read later
    end thread receiving data on fd: 5

    作 者: itdef
    欢迎转帖 请保持文本完整并注明出处
    技术博客 http://www.cnblogs.com/itdef/
    B站算法视频题解
    https://space.bilibili.com/18508846
    qq 151435887
    gitee https://gitee.com/def/
    欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
    如果觉得不错,欢迎点赞,你的鼓励就是我的动力
    阿里打赏 微信打赏
  • 相关阅读:
    Android 捕获异常并在应用崩溃后重启应用
    Android 浏览器 —— 使用 WebView 实现文件下载
    给 Android 研发的一些的建议
    java.util.concurrent.RejectedExecutionException
    java的关闭钩子(Shutdown Hook)
    PGP工作原理及其安全体制
    漫画:什么是红黑树?
    LINUX下IDEA等工具调试项目时提示:Unable to open debugger port
    MongoDB aggregate 运用篇(转)
    Java8系列之重新认识HashMap
  • 原文地址:https://www.cnblogs.com/itdef/p/6444742.html
Copyright © 2011-2022 走看看