十四、并发编程之I/O模型
http://www.cnblogs.com/linhaifeng/articles/7454717.html
1、模型介绍
1.1 IO种类
(1)* blocking IO 阻塞IO
(2)* nonblocking IO 非阻塞IO
(3)* IO multiplexing 多路复用IO
(4)* signal driven IO 信号驱动IO
(5)* asynchronous IO 异步IO
1.2 IO发生时的对象和步骤
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。
1.3 IO经历的阶段
(1)等待数据准备
(2)将数据从内核(操作系统内存)拷贝到进程(应用程序内存)中
2、阻塞IO
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
3、非阻塞IO
3.1 流程
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。 但是非阻塞IO模型绝不被推荐。
3.2 实例
#服务端
from socket import *
import time
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
conn_l=[]
del_l=[]
while True:
try:
conn,addr=s.accept()
conn_l.append(conn)
except BlockingIOError:
print(conn_l)
for conn in conn_l:
try:
data=conn.recv(1024)
if not data:
del_l.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
pass
except ConnectionResetError:
del_l.append(conn)
for conn in del_l:
conn_l.remove(conn)
conn.close()
del_l=[]
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
3.3 优缺点
优点:能够在等待任务完成的时间里干其他活了
缺点:
#1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
#2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
4、多路复用IO
4.1 流程
有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
强调:
1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
4.2 实例
#服务端
from socket import *
import select
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8081))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
read_l=[s,]
while True:
r_l,w_l,x_l=select.select(read_l,[],[])
print(r_l)
for ready_obj in r_l:
if ready_obj == s:
conn,addr=ready_obj.accept() #此时的ready_obj等于s
read_l.append(conn)
else:
try:
data=ready_obj.recv(1024) #此时的ready_obj等于conn
if not data:
ready_obj.close()
read_l.remove(ready_obj)
continue
ready_obj.send(data.upper())
except ConnectionResetError:
ready_obj.close()
read_l.remove(ready_obj)
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
4.3 优缺点
优点:
#相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
缺点:
#首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
#其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
5、异步IO
Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
6、selectors模块
select,poll,epoll这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的
#服务端
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
7、socketsever模块
服务端:
import socketserver
class MyTCPhandler(socketserver.BaseRequestHandler):
def handle(self):
# print(self)
# print(self.request) #self.request=conn
while True:
try:
print(self.client_address)
print(self.server) #obj
data=self.request.recv(1024)
if not data:break
self.request.send(data.upper())
except ConnectionResetError:
break
if __name__ == '__main__':
print('starting...')
obj=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyTCPhandler)
#bind_and_activate=True
#socketserver.ForkingTCPServer
obj.serve_forever() #连接循环
'''
t=Thread(target=MyTCPhandler().handle)
t.start()
'''
# obj.server_address #('127.0.0.1',8080)
# obj.RequestHandlerClass #MyTCPhandler
# obj.socket #s对象
客户端:
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
8、并发编程总结(重点)
1 生产者消费者模型
2 开启进程、开启线程
3 进程池、线程池(回调机制)
4 GIL全局解释器锁