为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞
同步:
#所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数 都是同步调用。但是一般而言, 我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。--串行
异步:
#异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、 通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有 些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。如果是使用通知的 方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。
阻塞:
#阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激 活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的 ,只是从逻辑上当前函数没有返回而已。
非阻塞:
#非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。
IO模型
阻塞IO(blocking IO)
默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样
接收端调用recvfrom这个系统调用,要收到数据要经历发送端准备数据,将数据copy到内存,以及传输的过程。
在这个时间内接收端会被阻塞。直到接收到发送端发过来的数据才会解除block的状态,重新运行起来。 所以,blocking IO的特点就是在IO执行的两个阶段(等待数据(读)和拷贝数据(写)两个阶段)都被block了。
在网络编程中大部分socket借口都是阻塞型的(recv,accept)
ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。
实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
一个简单的解决方案
#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任 何一个连接的阻塞都不会影响其他的连接。
该方案的问题是:
#开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资 源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
改进方案:
#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的 线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连 接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据 库等。
改进后的方案其实也存在着问题:
#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上 限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑 其面临的响应规模,并根据响应规模调整“池”的大小。
对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。
非阻塞IO(non-blocking IO)
通过设置socket使其变为non-blocking当对一个non-blocking socket执行读操作时,流程是这个样子:
执行代码 sevenr.setblocking(False) 默认为True
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
服务端 from socket import * import time server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) #设置为非阻塞 conn_l=[] while True: try: conn, addr = server.accept()# 非阻塞状态下这里不会等待 conn_l.append(conn)# 将链接存入列表中 print(addr) except BlockingIOError: # 但是如果没有连接就会报错,这就是内核返回给用户进程的信息 del_l = [] for conn in conn_l: # 循环连接列表接收数据 try: data=conn.recv(1024) if not data: # 这个条件是专门针对linux系统客户端断开连接 conn.close() # 客户端断开连接后会一直收到空 del_l.append(conn) # continue # conn.send(data.upper()) except BlockingIOError: # 没有接受到数据就去接收下一个循环的数据 pass except ConnectionResetError: # 针对win系统客户端断开连接,服务端会报错 conn.close() del_l.append(conn) for conn in del_l: # 删除断开的连接 conn_l.remove(conn) #客户端 from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8080)) while True: msg=input('>>: ') if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8'))
但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
但是也难掩其缺点:
#1 对cpu的占用率过多,但是是无用的占用(一直在循环) #2 在连接数过多的情况下,不能及时响应客户的的消息(for循环遍历)
此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
多路复用IO(IO multiplexing)
多路的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
select负责的socket都会被监听,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
强调:
1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
一个多路复用的服务端
from socket import * import time import select server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8084)) server.listen(5) server.setblocking(False) read_l=[server,] # 将套接字对象存入列表 print('starting....') while True: rl,wl,xl=select.select(read_l,[],[]) # 必须传递三个列表 对应读操作,写操作,以及错误 for r in rl: # 循环rl处理对象 if r is server: conn,addr=rl[0].accept() #conn,addr=server.accpet() print(addr) read_l.append(conn) # 将用于传输的套接字对象放入列表中 else: try: data=r.recv(1024) if not data: r.close() read_l.remove(r) continue r.send(data.upper()) except ConnectionResetError: r.close() read_l.remove(r) # 删除断开的连接
多路复用的HTTP客户端
import socket import select sk = socket.socket() sk.setblocking(False) sk.connect(('192.168.0.1',80)) read_l=[sk,] # 将套接字对象存入列表 write_l=[sk,] # 将套接字对象存入列表 while True: if not read_l: break rl,wl,xl=select.select(read_l,write_l,[]) # select()接收三个列表,检测第一个列表中的socket对象是否收到数据,可以进行读操作 # 检测第二个类表中的socket是否可以进行写操作,也就是能够发送数据连接成功的 # 其返回值也是三个列表,其中就是收到数据的,连接成功的,和错误的 # 官方描述 # select()返回三个新列表,其中包含传入的列表内容的子集。 # 可读列表中的所有套接字都有传入的数据缓冲和可用来读取。 # 可写列表中的所有套接字在缓冲区中都有空闲空间,可以写入。 # 异常返回的套接字有一个错误(“异常条件”的实际定义取决于平台)。 # 也可以说是wl中是已连接的套接字对象,跳过了等待连接server.accept()或connect # rl中是收到消息的套接字,跳过了等待接收数据server.recv() for sk in wl: sk.sendall( b'GET /wupeiqi HTTP/1.1 User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 ') # 已经连接成功的socket,无需再继续监听 write_l.remove(sk) # r=服务端给用户返回数据了 r=[sk1,] for sk in rl: data = sk.recv(8096) sk.close() # 断开连接:短连接、无状态 read_l.remove(sk) # 不再监听
异步非阻塞客户端
# 事件循环的异步非阻塞客户端 import socket import select class Request(object): def __init__(self,sk,callback): self.sk = sk self.callback = callback def fileno(self): # select实际上就是检测的socket的fileno return self.sk.fileno() class AsyncHttp(object): def __init__(self): self.fds = [] self.conn = [] def add(self,url,callback): sk = socket.socket() sk.setblocking(False) try: sk.connect((url,80)) except BlockingIOError as e: pass req = Request(sk,callback) # 封装socket和回调函数 self.fds.append(req) self.conn.append(req) def run(self): """ 监听socket是否发生变化 :return: """ while True: """ fds=[req(sk,callback),req,req] conn=[req,req,req] """ r,w,e = select.select(self.fds,self.conn,[],0.05) # sk.fileno() = req.fileno() # w=已经连接成功的socket列表 w=[sk1,sk2] for req in w: req.sk.sendall(b'GET /wupeiqi HTTP/1.1 User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 ') # 已经连接成功的socket,无需再继续监听 self.conn.remove(req) # r=服务端给用户返回数据了 r=[sk1,] for req in r: data = req.sk.recv(8096) req.callback(data) req.sk.close() # 断开连接:短连接、无状态 self.fds.remove(req) # 不再监听 if not self.fds: break
基于协程的异步非阻塞
import socket,select class ReqGen(object): def __init__(self,sk,gen): self.sk = sk self.gen = gen def fileno(self): return self.sk.fileno() class AsyncHttp(object): def __init__(self): self.conn = [] self.fds = [] def add(self,url,gen): sk = socket.socket() sk.setblocking(False) try: sk.connect((url,80,)) except BlockingIOError as e: pass rg = ReqGen(sk,gen) self.conn.append(rg) self.fds.append(rg) def run(self): """ 监听socket是否发生变化 :return: """ while True: """ fds=[req(sk,callback),req,req] conn=[req,req,req] """ r,w,e = select.select(self.fds,self.conn,[],0.05) # sk.fileno() = req.fileno() # w=已经连接成功的socket列表 w=[sk1,sk2] for req in w: req.sk.sendall(b'GET /wupeiqi HTTP/1.1 User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 ') # 已经连接成功的socket,无需再继续监听 self.conn.remove(req) # r=服务端给用户返回数据了 r=[sk1,] for req in r: data = req.sk.recv(8096) new_req= req.gen.send(data) req.sk.close() # 断开连接:短连接、无状态 self.fds.remove(req) # 不再监听 if not self.fds: break def run(tasks): ah = AsyncHttp() for gen in tasks: # gen = 生成器 req = gen.send(None) # req = Request(url) ah.add(req.url,gen) ah.run() def fetch(url): response = yield Request(url) # Request(url)封装了url print(response)# 第一个页面的结果 # response 处理响应 yield tasks= [fetch('www.baidu.com'),fetch('www.cnblogs.com')] run(tasks)
select监听fd变化的过程分析:
#用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据 后,就会发送信号给用户进程数据已到; #用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数 据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
该模型的优点:
#相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能 够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
该模型的缺点:
#首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消 耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll(利用了回调机制),BSD提 供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。 遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器 会比较困难。 #其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
1. select/poll把fd的监听列表放在用户空间,由用户空间管理,导致在用户空间和内核空间之间频繁重复拷贝大量fd;epoll在内核建立fd监听列表(实际是红黑树),每次通过epoll_ctl增删改即可。
2. select/poll每当有fd内核事件时,都唤醒当前进程,然后遍历监听列表全部fd,检查所有就绪fd并返回;epoll在有fd内核事件时,通过回调把该fd放到就绪队列中,只需返回该就绪队列即可,不需要每次遍历全部监听fd。
异步IO(Asynchronous I/O)
这个用的不多,但是效率非常高
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。