非阻塞io
from socket import * import time s=socket(AF_INET,SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) #设置socket的接口为非阻塞 conn_l=[] del_l=[] while True: try: conn,addr=s.accept() conn_l.append(conn) except BlockingIOError: print(conn_l) for conn in conn_l: try: data=conn.recv(1024) if not data: del_l.append(conn) continue conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: del_l.append(conn) for conn in del_l: conn_l.remove(conn) conn.close() del_l=[]
特点:实现了非阻塞,提高了cpu占用率,但由于一直监听 accept ,cpu占用率过高!
多路复用
select 模型:
from socket import * import select s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(('127.0.0.1',8081)) s.listen(5) s.setblocking(False) #设置socket的接口为非阻塞 read_l=[s,] while True: r_l,w_l,x_l=select.select(read_l,[],[]) print(r_l) for ready_obj in r_l: if ready_obj == s: conn,addr=ready_obj.accept() #此时的ready_obj等于s read_l.append(conn) else: try: data=ready_obj.recv(1024) #此时的ready_obj等于conn if not data: ready_obj.close() read_l.remove(ready_obj) continue ready_obj.send(data.upper()) except ConnectionResetError: ready_obj.close() read_l.remove(ready_obj)
select 模型过程:
用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
#用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
缺点:因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。
epoll 更好的选择,但是windows不支持。
selector模型——根据系统自动选择
1 from socket import * 2 import selectors 3 4 sel=selectors.DefaultSelector() 5 def accept(server_fileobj,mask): 6 conn,addr=server_fileobj.accept() 7 sel.register(conn,selectors.EVENT_READ,read) 8 9 def read(conn,mask): 10 try: 11 data=conn.recv(1024) 12 if not data: 13 print('closing',conn) 14 sel.unregister(conn) 15 conn.close() 16 return 17 conn.send(data.upper()+b'_SB') 18 except Exception: 19 print('closing', conn) 20 sel.unregister(conn) 21 conn.close() 22 23 24 25 server_fileobj=socket(AF_INET,SOCK_STREAM) 26 server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 27 server_fileobj.bind(('127.0.0.1',8088)) 28 server_fileobj.listen(5) 29 server_fileobj.setblocking(False) #设置socket的接口为非阻塞 30 sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept 31 32 while True: 33 events=sel.select() #检测所有的fileobj,是否有完成wait data的 34 for sel_obj,mask in events: 35 callback=sel_obj.data #callback=accpet 36 callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
from socket import * import time s=socket(AF_INET,SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) s.setblocking(False) #设置socket的接口为非阻塞 conn_l=[] del_l=[] while True: try: conn,addr=s.accept() conn_l.append(conn) except BlockingIOError: print(conn_l) for conn in conn_l: try: data=conn.recv(1024) if not data: del_l.append(conn) continue conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: del_l.append(conn) for conn in del_l: conn_l.remove(conn) conn.close() del_l=[]