一、复习
1、进程、线程、协程
进程:是计算机中最小的资源分配单位,数据隔离,可以利用多核,数据不安全
线程:是计算机中最小的CPU调度单位,数据共享,GIL,数据不安全
协程:是线程的一部分,是由用户来调度,数据共享,数据安全
2、同步、异步、阻塞、非阻塞
异步:同时做不止一件事
同步:事情一件做完接着下一件
阻塞:recv ecvfromacceptsleepinput
非阻塞:
二、IO多路复用
IO操作:
文件处理:文件处理,json.dump/load,input,print,logging
网络操作:recv/send,resvfrom/sendto,accept/connect
# recv 为什么要阻塞
# 等待数据来到我Python程序的内存里
1.阻塞IO:
2.非阻塞IO:
代码举例:
# import time import socket sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.setblocking(False) sk.listen() conn_lst = [] del_lst = [] while True: try: conn,addr = sk.accept() #--> 非阻塞,没有连接来就报错 conn_lst.append(conn) print(conn) except BlockingIOError: for con in conn_lst: # conn1,conn2,conn3 try: con.send(b'hello') try: print(con.recv(1024)) # 非阻塞 没有消息来就报错 except BlockingIOError:pass # recv没有消息的报错 except ConnectionResetError: # send没有连接的报错 con.close() del_lst.append(con) for con in del_lst: conn_lst.remove(con) del_lst.clear() # 非阻塞的形式实现了并发的socket server # 非阻塞的形式实现了并发的socket server,太耗cpu # 没有数据来 的时候 程序的高速处理极大地占用了CPU资源
import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: print(sk.recv(1024)) sk.send(b'bye') sk.close()
3.IO多路复用:
import select import socket sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.setblocking(False) sk.listen() rlst = [sk] # 监听的是对象的读操作 wlst = [] # 监听的是对象的写操作 xlst = [] # 监听的是对象的异常操作 while True: rl,wl,xl = select.select(rlst,wlst,xlst) # [sk,conn] for obj in rl: # [conn1,conn2] if obj == sk: conn,addr = sk.accept() # 每次建立连接的时候conn rlst.append(conn) else: msg = obj.recv(1024) if msg == b'': obj.close() rlst.remove(obj) continue print(msg) obj.send(b'hello') # socketserver # TCP协议的并发操作 selectors + 多线程
import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: sk.send(b'wahaha') print(sk.recv(1024)) sk.close() # IO多路复用的select的工作机制 # select windows 轮询 # poll linux 轮询,poll能够监听的对象比select要多 # epoll linux 不是采用轮询的方式,而是采用回调函数的形式
# IO多路复用的select的工作机制
# select windows 轮询
# poll linux 轮询,poll能够监听的对象比select要多
# epoll linux 不是采用轮询的方式,而是采用回调函数的形式
跨平台或平台自适应IO多路复用:
import selectors import socket sel = selectors.DefaultSelector() def accept(obj,mask): """ 回调函数,当selectors实例感知有用户连接服务器时,就会回调该函数。 :param obj: :param mask: :return: """ conn,addr = obj.accept() sel.register(conn, selectors.EVENT_READ, read) # 注册用户连接conn到selector监听列表中 def read(conn,mask): """ 回调函数,当selectors实例感知有用户发送数据时,就会回调该函数。 :param conn: :param mask: :return: """ try: data = conn.recv(1024) if not data: # 为空则抛出异常由下边的异常处理语句处理 raise Exception conn.send(data+'_sb'.encode('utf-8')) except Exception as e: print('closing', conn) sel.unregister(conn) conn.close() sk = socket.socket() sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sk.bind(('127.0.0.1', 9000)) sk.listen() sk.setblocking(False) sel.register(sk, selectors.EVENT_READ, accept) while 1: events = sel.select() # [sk,conn1,conn2...] 谁有新的数据就会返回谁 for key, mask in events: callback = key.data # 回到函数 callback(key.fileobj, mask) # 执行回调函数
import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while 1: inp = input('>>>') sk.send(inp.encode('utf-8')) print(sk.recv(1024).decode('utf-8'))
4.异步IO:
5.各种IO对比: