首先复习了基于协程实现的套接字通信:
#服务端 from gevent import monkey,spawn;monkey.patch_all() from socket import * def lack(conn): while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port,count=5): s=socket() s.bind((ip,port)) s.listen(count) while True: conn,addr=s.accept() g=spawn(lack,conn) if __name__ == '__main__': spawn(server,'192.168.12.81', 5050).join() #客户端 from threading import Thread,current_thread from socket import * def client(): client = socket() client.connect(('192.168.12.81', 5050)) while True: client.send(('%s is running'%current_thread().name).encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) if __name__ == '__main__': for i in range(10): t=Thread(target=client) t.start()
异步io
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time import os def task(n): print('%s is running' %current_thread().name) time.sleep(2) return n**2 def parse(obj): res=obj.result() print(res) if __name__ == '__main__': t=ThreadPoolExecutor(4) future1=t.submit(task,1) future1.add_done_callback(parse) #parse函数会在future1对应的任务执行完毕后自动执行,会把future1自动传给parse future2=t.submit(task,2) future2.add_done_callback(parse) future3=t.submit(task,3) future3.add_done_callback(parse) future4=t.submit(task,4) future4.add_done_callback(parse)
io模型
Stevens在文章中一共比较了五种IO Model:
* blocking IO
* nonblocking IO
* IO multiplexing
* signal driven IO
* asynchronous IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。
阻塞io模型:
#服务端 import socket server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(('192.168.12.81', 5050)) server.listen(5) while True: conn,addr=server.accept() while True: try: data=conn.recv(1024) if not data: break conn.send(data.upper()) except ConnectionResetError: break conn.close() server.close() #客户端 import socket client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('192.168.12.81', 5050)) while True: msg=input('>>:').strip() client.send(msg.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8')) client.close()
我们最初写的基于网络传输的套接字就是阻塞io
非阻塞io模块:
#服务端 from socket import * s=socket() s.bind(('192.168.12.81', 5050)) s.listen(5) s.setblocking(False) r_list=[] w_list=[] while True: try: conn,addr=s.accept() r_list.append(conn) except BlockingIOError: print('rlist:',len(r_list)) del_rlist=[] for conn in r_list: try: data=conn.recv(1024) if not data: conn.close() del_rlist.append(conn) continue w_list.append((conn,data.upper())) except BlockingIOError: continue except ConnectionResetError: conn.close() del_rlist.append(conn) del_wlist=[] for item in w_list: try: conn=item[0] res=item[1] conn.send(res) del_wlist.append(item) except BlockingIOError: continue except ConnectionResetError: conn.close() del_wlist.append(item) for conn in del_rlist: r_list.remove(conn) for item in del_wlist: w_list.remove(item) #客户端 from socket import * import os c=socket() c.connect(('192.168.12.81', 5050)) while True: data='%s say hello'%os.getpid() c.send(data.encode('utf-8')) res=c.recv(1024) print(res.decode('utf-8'))
多路复用io:
#服务端 from socket import * import select s=socket() s.bind(('192.168.12.81', 5050)) s.listen(5) s.setblocking(False) r_list=[s,] w_list=[] w_data={} while True: print('被检测r_list: ', len(r_list)) print('被检测w_list: ', len(w_list)) r1,w1,x1=select.select(r_list,w_list,[],) for r in r1: if r==s: conn,addr=r.accept() r_list.append(conn) else: try: data=r.recv(1024) if not data: r.close() r_list.remove(r) continue w_list.append(r) w_data[r]=data.upper() except ConnectionResetError: r.close() r_list.remove(r) continue for w in w1: w.send(w_data[w]) w_list.remove(w) w_data.pop(w) #客户端 from socket import * import os client=socket() client.connect(('192.168.12.81', 5050)) while True: data='%s say hello'%os.getpid() client.send(data.encode('utf-8')) res=client.recv(1024) print(res.decode('utf-8'))
多路复用io单进程的情况下,效率比阻塞io还要低,它的作用是在多个进程的情况下 体现的
异步io模块:
异步io模块的效率最高