引言:上一篇说到了线程池方式来处理服务器端的并发,并给出了一个线程池的方案(半同步,半异步方式)。各有各的好处吧,今天来讲讲关于非阻塞的异步IO。
说到异步IO,其实现在很难实现真正的异步,大部分情况下仍然需要阻塞在某个多路复用函数,比如select 或者 epoll 上,得到就绪描述符,然后调用注册在相应描述符上的回调函数。这种方式是现在的反应堆设计的基本思路。我截取一段反应堆模型的图给大家看看。
这个图是截取至 python的 twisted 服务器的反应堆文章介绍,但是大致和我们需要的理念一样。
事件循环阻塞查看描述符是否就绪,当就绪后返回可读或可写的描述符,也有可能带外数据或者出错等情况。
因为 select 很多文章都介绍了,下面我就以 epoll 为例,貌似是2.4.6还是哪个版本以后加入的IO多路复用方式。
epoll 较select 的一些优点就不多说了,内核采用红黑树机制,大大提高了epoll 的性能。著名的 libevent Nginx等内部都采用这个机制。
废话不多说,看一个简单的epoll 模式,其实本来不想介绍这个的,因为直接 man epoll 就可以看到一个简单的demo,但是为了文章的连贯性,还是继续把这部分介绍一下。
epoll 主要有几个函数:
int epoll_create(int size);
在现在的Linux版本中,size 已不重要,默认的不超过最大值就可以。size 就是描述符数目的最大值。
函数的返回值是一个描述符(句柄),很简单的就创建了epoll.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第一个参数是由 epoll_create 返回的描述符
第二个参数是由宏定义的几个值
EPOLL_CTL_ADD:类似于 select 的 FD_SET() ,将一个描述符加入到epoll 监听队列中
EPOLL_CTL_MOD:修改已经注册的fd的事件类型
EPOLL_CTL_DEL:将一个描述符从epoll 监听队列中删除
第三个参数是需要加入的描述符
第四个是一个结构体参数,结构是这样的
struct epoll_event { __uint32_t events; epoll_data_t data; };
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
epoll_event 结构体里面的events 表示的是返回的事件类型或者是加入时候的事件类型。也有可能是带外数据或者错误等,它由几个宏定义:
EPOLLIN :文件描述符上的读事件
EPOLLOUT:文件描述符上的写事件
EPOLLPRI:描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:描述符发生错误;
EPOLLHUP:描述符被挂断;
EPOLLET: 边缘触发(Edge Triggered)模式
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
值得一说的是,很多文章都没有提到这个宏其实可能是由你自己改变的,通过 epoll_ctl 或者是在 epoll_wait 返回的时候操作系统改的,因为描述符有可能出错等。
一般情况下,对于一个描述符,可以使用 | 运算来组合。
添加一个描述符,监听是否可读或可写。
EPOLLIN | EPOLLOUT
注意一下epoll_data_t中的 ptr 或者 fd 而不是 ptr和fd,这个结构只能包含其中一个,所以在注册相应的描述符上的事件的时候,要么注册的是对应的描述符fd,要么注册的是相应的事件封装,当然,事件封装里面必然有fd,不然无法继续下面的操作。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
第一个参数是epoll的描述符,第二个参数是一个指向 struct epoll_event 的指针,这里需要传入的是一个数组,epoll_event 类型.
第三个,最大的监听事件数组值。第四个是超时时间,对于Nginx或者很多如libevent 的超时时间管理是利用红黑树和最小堆来管理的,很巧妙的方式,以后写一篇博文介绍,这里只需要知道timeout 是 epoll_wait 的阻塞的最大值,如果超过这个值不管是否有事件都返回,0表示立即返回,即有无事件都返回,-1 是永久阻塞。
一个简单的 epoll demo
struct epoll_event ev,events[1024]; epfd=epoll_create(1024);
for( ; ; ) { nfds = epoll_wait(epfd,events,1024,time_value); for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) /*如果加入的监听描述符有事件*/ { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); /*accept这个连接并得到链接描述符,将描述符加入到epoll 监听事件队列*/
setnonblocking(connfd); ev.data.fd=connfd; ev.events=EPOLLIN|EPOLLET; /*读事件*/ epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); /*将新的fd添加到epoll的监听队列中*/ } else if( events[i].events&EPOLLIN ) //接收到数据,读socket { n = read(sockfd, line, MAXLINE)) < 0 ev.data.ptr = my_ev; //ev 可以是自己定义的事件封装或者是fd ev.events=EPOLLOUT|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);/*修改标识符,等待下一个循环时发送数据*/ } else if(events[i].events&EPOLLOUT) /*对应的描述符可写,即套接口缓冲区有缓冲区可写*/ { struct my_event* my_ev= (my_event*)events[i].data.ptr; sockfd = my_ev->fd; send( sockfd, ev->ptr, strlen((char*)my_ev->ptr), 0 ); ev.data.fd=sockfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); } else { // } } }
epoll 还没讲清楚,其中还有很多需要注意的地方。只是想让不懂异步事件和反应堆模式的读者了解这种模式。注意的是这种模式下连接描述符需要设置为非阻塞,然后IO 操作函数应该记录每次读写的状态,如果缓冲区满的话需要记录状态,下次返回这个描述符的时候继续上一次的状态继续传输或读取,因为一个套接口缓冲区读取的是应用层数据,而TCP层的数据如果比较大的时候分段的话会导致一次不能完全读取或写入全部数据而套接口缓冲区已经满了。需要选取的模式是LT 水平触发方式,如果是ET 边缘触发方式,一次读取套接口或者写入套接口但是缓冲区满了不能继续写后,epoll_wait不会继续返回,不需要状态机记录。ET 方式也是所谓的高速模式。
总结:这里只是对epoll 做了一个简单的介绍,如有错误,请指教。希望大牛们不要介意,承前启后,后面会有一个反应堆的框架的介绍,这里没有使用到事件封装和设置回调函数等,只是一个demo,还不是我自己写的。下一站分享一个 epoll 异步事件封装。今天就到这里吧