1 阻塞和非阻塞
对于阻塞和非阻塞,网上有一个很形象的比喻,就是说好比你在等快递,阻塞模式就是快递如果不到,你就不能做其他事情。非阻塞模式就是在这段时间里面,你可以做其他事情,比如上网、打游戏、睡觉等,很显然非阻塞的模式会效率更高。
非阻塞的模式也分两种,第一种就是忙轮询,因为你不知道快递什么时候来,所以你每5分钟就跟快递打一次电话进行询问,另外一种就是我们这篇文章讲的 epoll 模型,在等待快递到达的时间内,你尽可以做其他任何事情,包括睡觉,当快递到达时,你就会被告知。
那么阻塞在操作系统中到底是如何进行的呢?假设有一个管道,进程A为管道的写入方,B为管道的读出方。
假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。
假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。
这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满,这四个I/O事件是进行阻塞同步的根本。那么在我们的 client-server 模型是怎样发生阻塞的呢?
socket 之间的通信就像这个管道,两端的 socket 会进入读取和写入。但请注意的是,写入仅仅表示数据被复制到了内核中的 TCP 发送缓冲区,至于什么时候发送到网络,什么时候被对方主机接收,什么时候被对方进程读取,系统调用层面不会给与任何通知。由于缓冲区的大小是有限的,当该socket的写入缓冲区满时会发生阻塞。所以,如果接收端进程从socket读数据的速度跟不上发送端进程向socket写数据的速度,会导致发送端write调用阻塞。
读取阻塞则相对来说非常容易理解,就是该 socket 的读取缓冲区中没有数据时发生阻塞,通常是因为发送端的数据没有到达。如果想对缓冲区有一个感性的了解,可以在 Linux 下执行如下命令,查看本机 socket 的发送和读取缓冲区大小。如下图所示:
既然 socket 在读写的过程中会存在阻塞,那么如何进行非阻塞的socket 读写呢?很简单,我们可以记录所有这些流,通过写一个 for 循环把所有socket流从头到尾问一遍。但这样的做法显然不好,因为某些 socket 没有数据,则只会浪费 CPU 的时间。那怎么解决这个问题呢?答案就是引进一个 代理,通过代理来观察许多流的I/O 事件,在空闲的时候把当前线程阻塞掉,当一个或多个流有 I/O 事件时,就从阻塞态醒来,这个代理就是 select, poll 和 epoll 模型。
2 select, poll, epoll 代理
select 和 poll
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
while true { select(streams[]) for i in streams[] { if i has data read until unavailable } }
1)单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
2)select 所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll则在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
epoll
epoll是Linux 2.6 开始出现的为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
伪代码如下:
while true { active_stream[] = epoll_wait(epollfd) for i in active_stream[] { read or write till } }
3 采用 select 和 epoll 代理重构网络并发模型
通过上面的分析,我们知道采用代理可以一次处理多个连接,那么到底是如何实现的呢,我们以上节课的代码为基础,分别使用 select 和 epoll 代理进行重构,并比较它们之间的区别。先给出代码如下:
1) select 代理实现的 server代码:server_select.py
#coding:utf-8 import socket from time import ctime import select import Queue HOST = '' PORT = 21567 BUFSIZE = 1024 ADDR = ('127.0.0.1', PORT) # 服务器端创建 socket serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serverSock.bind(ADDR) serverSock.listen(5) inputs = [serverSock] outputs = [] timeout = 20 message_queues = {} while inputs: print "doing select ..." readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout) for s in readable: if s is serverSock: server2client_Sock, addr = serverSock.accept() print " Connection from " server2client_Sock.setblocking(0) inputs.append(server2client_Sock) message_queues[server2client_Sock] = Queue.Queue() else: server2client_Sock = s data = server2client_Sock.recv(BUFSIZE) # 如果数据接收完,则退出 recv, 进入到下一个连接 if data: # server2client_Sock.send('[%s] %s' % (ctime(), data)) print "Received data from ", server2client_Sock.getpeername() data = '[%s] %s' % (ctime(), data) message_queues[server2client_Sock].put(data) # 将建立连接的 socket 放入到可以写的 socket 列表中 if server2client_Sock not in outputs: outputs.append(server2client_Sock) else: if server2client_Sock in outputs: outputs.remove(server2client_Sock) inputs.remove(server2client_Sock) server2client_Sock.close() del message_queues[server2client_Sock] if s in writable: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: print " " , s.getpeername() , 'queue empty' outputs.remove(s) else: print " sending " , next_msg , " to ", s.getpeername() s.send(next_msg) serverSock.close()
2) epoll代理实现的 server代码:server_epoll.py
#coding:utf-8 import socket from time import ctime import select import Queue HOST = '' PORT = 21567 BUFSIZE = 1024 ADDR = ('127.0.0.1', PORT) # 服务器端创建 socket serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serverSock.bind(ADDR) serverSock.listen(5) timeout = 1000 # millisecond message_queues = {} # key state of socket io READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) READ_WRITE = (READ_ONLY|select.POLLOUT) poller = select.poll() poller.register(serverSock, READ_ONLY) fd_to_sockets = {serverSock.fileno(): serverSock, } while True: print "Waiting for next event ..." events = poller.poll(timeout) for fd, flag in events: s = fd_to_sockets[fd] if flag & (select.POLLIN | select.POLLPRI): if s is serverSock: server2client_Sock, addr = serverSock.accept() print "Connetion from ", addr server2client_Sock.setblocking(0) fd_to_sockets[server2client_Sock.fileno()] = server2client_Sock poller.register(server2client_Sock, READ_ONLY) message_queues[server2client_Sock] = Queue.Queue() else: server2client_Sock = s data = server2client_Sock.recv(BUFSIZE) if data: print "Received data from ", server2client_Sock.getpeername() data = '[%s] %s' % (ctime(), data) message_queues[server2client_Sock].put(data) # 将建立连接的 socket 放入到可以写的 socket 列表中 poller.modify(server2client_Sock, READ_WRITE) else: poller.unregister(server2client_Sock) server2client_Sock.close() del message_queues[server2client_Sock] else: try: next_msg = message_queues[s].get_nowait() except Queue.Empty: print " " , s.getpeername() , 'queue empty' poller.modify(s, READ_ONLY) else: print " sending " , next_msg , " to ", s.getpeername() s.send(next_msg) serverSock.close()
对于 select 代理最核心的调用就是
readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)
该调用将可读可写的socket存储到 readable 和 writable 列表中,从而我们可以直接调用这些 socket的 recv 和 send 时不会发生阻塞。注意除了 serverSock 只读以外,其他 socket 都会存在同时存在于 inputs 和 outputs 列表中。
对于 epoll 代理最核心的就是
events = poller.poll(timeout)
该调用不需要输入观察的 socket,它是之前通过 register 来指定的。和 select 模式中的代码一样,这些 socket 都是可读可写的,通过如下代码实现:
poller.modify(server2client_Sock, READ_WRITE)
4 问题分析
至此使用 epoll 代理重构我们的 server已经做完了,大家可以同时运行多个 client.py 进行交互,我们可以观察到该 server 具备了同时处理多个客户端连接的能力,而且所有的 socket 都是非阻塞的。
然而,对于这样一个简单的服务(将客户端发送的数据加上时间戳返回)我们的代码结构看起来已经非常复杂,所以如果要处理我们实际业务中的逻辑处理简直是无法想象。
另外,对于超高的并发请求,仅仅采用 epoll 模型是不够的,我们还必须使用多进程多线程等方式来充分利用系统资源。
关于后面这个问题会是本系列文章重点讨论的部分,也是tornado 源码中的核心部分,我们会在稍微后面文章中去讨论。接下来的几篇文章中我们会尝试先梳理代码结构的问题,真正进入到 tornado 源码的部分,看看 tornado 的如何进行架构的。与其他torando源码文章不同的是,我们不会直接按模块挨个挨个分析代码,这种方式既晦涩难懂又非常低效,而是尝试从零开始搭建它,也就是说我们会仿造 tornado 的框架结构,从最简单开始,一步一步去实现 tornado,从而可以从内部逐步理清 tornado 这个优秀高并发框架的脉络。