异步编程
重要概念
同步、异步
阻塞、非阻塞
区别*
联系
同步IO、异步IO、IO多路复用
IO两个阶段
IO模型
同步IO
阻塞IO
非阻塞IO
IO多路复用
异步IO
Python中的IO多路复用
selectors库
#在selects模块源码最下面有如下代码 # Choose the best implementation, roughly: # epoll|kqueue|devpoll > poll > select. # select() also can't accept a FD > FD_SETSIZE (usually around 1024) if 'KqueueSelector' in globals(): DefaultSelector = KqueueSelector elif 'EpollSelector' in globals(): DefaultSelector = EpollSelector elif 'DevpollSelector' in globals(): DefaultSelector = DevpollSelector elif 'PollSelector' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
1 #使用示例 2 import selectors 3 import threading 4 import socket 5 import logging 6 7 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s" 8 logging.basicConfig(level=logging.INFO,format=FORMAT) 9 10 #回调参数自己定义形参 11 def accept(sock:socket.socket,mask): 12 """mask:事件掩码的或值""" 13 conn,addr = sock.accept() 14 conn.setblocking(False)#不阻塞 15 pass 16 17 #回调函数 18 def read(conn:socket.socket,mask): 19 pass 20 21 #构造缺省性能最优selector 22 selector = selectors.DefaultSelector() 23 24 #创建Tcp Server 25 socket = socket.socket() 26 socket.bind(('0.0.0.0',9999)) 27 socket.listen() 28 logging.info(socket) 29 socket.setblocking(False)#非阻塞 30 31 #注册文件对象sock关注读事件,返回SelectorKey 32 #将socket、关注事件、data都绑定到key实例属性上 33 key = selector.register(socket,selectors.EVENT_READ,accept) 34 logging.info(key) 35 36 e = threading.Event() 37 38 def select(e:threading.Event): 39 while not e.is_set(): 40 #开始监视,等到有文件对象监控事件产生,返回(key,mask)元组 41 events = selector.select()#阻塞 42 print('-'*30) 43 for key,mask in events: 44 logging.info(key) 45 logging.info(mask) 46 callback = key.data #回调函数 47 callback(key.fileobj,mask) 48 49 threading.Thread(target=select,args=(e,),name='select').start() 50 51 def main(): 52 while not e.is_set(): 53 cmd = input('>>>') 54 if cmd.strip() == 'quit': 55 e.set() 56 fobjs = [] 57 logging.info("{}".format(list(selector.get_map().items()))) 58 59 for fd,key in selector.get_map().items(): 60 print(fd,key) 61 print(key.fileobj) 62 fobjs.append(key.fileobj) 63 64 for fobj in fobjs: 65 selector.unregister(fobj) 66 fobj.close() 67 selector.close() 68 69 if __name__ == '__main__': 70 main()
练习
1 import socket 2 import threading 3 import logging 4 import datetime 5 import selectors 6 7 FORMAT="%(asctime)s %(threadName)s %(thread)d %(message)s" 8 logging.basicConfig(level=logging.INFO,format=FORMAT) 9 10 #TCP Server 11 class ChatServer(): 12 def __init__(self,ip='0.0.0.0',port=9999): 13 self.addr = (ip,port) 14 self.socket = socket.socket() 15 self.event = threading.Event() 16 self.clients = {} 17 #增加 18 self.selector = selectors.DefaultSelector()#创建selector 19 20 def start(self):#启动监听 21 self.socket.bind(self.addr) 22 self.socket.listen() 23 # threading.Thread(target=self.accept,name='accept').start() 24 #增加 25 self.socket.setblocking(False) 26 self.selector.register(self.socket,selectors.EVENT_READ,self.accept)#注册 27 threading.Thread(target=self.select,name='select',daemon=True).start() 28 29 #增加 30 def select(self):#阻塞 31 while not self.event.is_set(): 32 #开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组 33 events = self.selector.select() 34 print("*"*30) 35 for key,mask in events: 36 logging.info(key) 37 logging.info(mask) 38 callback = key.data#回调函数 39 callback(key.fileobj) 40 41 #回调函数 42 def accept(self,sock:socket.socket):#接受客户端连接 43 conn,addr = sock.accept() 44 conn.setblocking(False) 45 #注册,监视每一个与客户端的连接的socket对象 46 self.selector.register(conn,selectors.EVENT_READ,self.recv) 47 48 #回调函数 49 def recv(self,sock:socket.socket):#接受客户端数据 50 data = sock.recv(1024) 51 if not data or data == b'quit':#客户端主动断开或退出,注销并关闭socket 52 self.selector.unregister(sock) 53 sock.close() 54 return 55 msg = "{:%Y/%m/%d %H%:M:%S} {}:{} {} ".format(datetime.datetime.now(), 56 *sock.getpeername(),data.encode()) 57 #群发 58 for key in self.selector.get_map().values(): 59 if key.data == self.recv:#排除self.accept 60 key.fileobj.send(msg) 61 62 #停止服务 63 def stop(self): 64 self.event.set() 65 fobjs = [] 66 for fd,key in self.selector.get_map().items():#fd:key对象 67 fobjs.append(key.fileobj) 68 for fobj in fobjs: 69 self.selector.unregister(fobj) 70 fobj.close() 71 self.selector.close() 72 73 def main(): 74 cs = ChatServer() 75 cs.start() 76 77 while True: 78 cmd = input(">>>") 79 if cmd == 'quit': 80 cs.stop() 81 break 82 logging.info(threading.enumerate()) 83 84 if __name__ == '__main__': 85 main()
1 #-*- codeing:utf-8 -*- 2 import socket 3 import threading 4 import datetime 5 import logging 6 import selectors 7 from queue import Queue 8 9 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s" 10 logging.basicConfig(level=logging.INFO,format=FORMAT) 11 12 class ChatServer: 13 def __init__(self,ip='127.0.0.1',port=9999): 14 self.sock = socket.socket() 15 self.addr = (ip,port) 16 self.clients = {} 17 self.event = threading.Event() 18 self.selector = selectors.DefaultSelector()#创建selector 19 20 def start(self):#启动监听 21 self.sock.bind(self.addr) 22 self.sock.listen() 23 self.sock.setblocking(False) 24 #注册 25 self.selector.register(self.sock,selectors.EVENT_READ,self.accept) 26 threading.Thread(target=self.select,name='selector').start() 27 28 def select(self): 29 while not self.event.is_set(): 30 #开始监视,等到某文件对象被监控的事件发生,返回(key,mask)元组 31 events = self.selector.select()#阻塞,直到events 32 for key,mask in events: 33 if callable(key.data): 34 callback = key.data 35 callback = (key.fileobj,mask) 36 else: 37 callback = key.data[0] 38 callback(key,mask) 39 40 def accept(self,sock:socket.socket,mask):#接收客户端连接 41 conn,raddr = self.sock.accept() 42 conn.setblocking(False) 43 self.clients[raddr] = (self.handler,Queue()) 44 #注册,监视每一个与客户端连接的socket对象 45 self.selector.register(conn,selectors.EVENT_READ | selectors.EVENT_WRITE,self.clients[raddr]) 46 47 def handler(self,key:selectors.SelectorKey,mask):#接收客户端数据 48 if mask & selectors.EVENT_READ: 49 sock = key.fileobj 50 raddr = sock.getpeername() 51 data = sock.recv(1024) 52 if not data or data == b'quit': # 客户端主动断开或退出,注销并关闭socket 53 self.selector.unregister(sock) 54 sock.close() 55 self.clients.pop(raddr) 56 return 57 for k in self.selector.get_map().values(): 58 logging.info(k) 59 if isinstance(k.data,tuple): 60 k.data[1].put(data) 61 if mask & selectors.EVENT_WRITE: 62 #因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样 63 if not key.data[1].empty(): 64 key.fileobj.send(key.data[1].get()) 65 66 # 停止服务 67 68 def stop(self): 69 self.event.set() 70 fobjs = [] 71 for fd, key in self.selector.get_map().items(): 72 fobjs.append(key.fileobj) 73 for fobj in fobjs: 74 self.selector.unregister(fobj) 75 fobj.close() 76 self.selector.close() 77 78 def main(): 79 cs = ChatServer() 80 cs.start() 81 82 while True: 83 cmd = input(">>>") 84 if cmd == 'quit': 85 cs.stop() 86 break 87 logging.info(threading.enumerate()) 88 89 if __name__ == '__main__': 90 main()
asyncio
问题的引出
def a(): for x in range(3): print(x) def b(): for x in "abc": print(x) a() b() #运行结果一定是 0 1 2 a b c
import threading import time def a(): for x in range(3): time.sleep(0.001) print(x) def b(): for x in "abc": time.sleep(0.001) print(x) threading.Thread(target=a,name='a').start() threading.Thread(target=b,name='b').start() #运行结果 a 0 b 1 c 2
#多进程版本 import multiprocessing import time def a(): for x in range(3): time.sleep(0.001) print(x) def b(): for x in "abc": time.sleep(0.001) print(x) if __name__ == '__main__': multiprocessing.Process(target=a, name='a').start() multiprocessing.Process(target=b, name='b').start() #运行结果 0 1 a 2 b c
#生成器版本 def a(): for x in range(3): print(x) yield def b(): for x in "abc": print(x) yield x = a() y = b() for i in range(3): next(x) next(y) #运行结果 0 a 1 b 2 c
事件循环
协程
协程的使用
import asyncio @asyncio.coroutine def sleep(x):#协程函数 for i in range(3): print('sleep {}'.format(i)) yield from asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(sleep(3)) loop.close()
import asyncio async def sleep(x): for i in range(3): print('sleep {}'.format(i)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(sleep(3)) loop.close()
import asyncio import threading async def sleep(x): for i in range(x): print('sleep {}'.format(i)) await asyncio.sleep(x) async def showthread(x): for i in range(x): print(threading.enumerate()) await asyncio.sleep(2) loop = asyncio.get_event_loop() tasks = [sleep(3),showthread(3)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() #运算结果 [<_MainThread(MainThread, started 21012)>] sleep 0 [<_MainThread(MainThread, started 21012)>] sleep 1 [<_MainThread(MainThread, started 21012)>] sleep 2
#协程版本 import asyncio @asyncio.coroutine def a(): for x in range(3): print('a.x',x) yield @asyncio.coroutine def b(): for x in 'abc': print('b.x',x) yield print(asyncio.iscoroutinefunction(a)) print(asyncio.iscoroutinefunction(b)) #大循环 loop = asyncio.get_event_loop() tasks = [a(),b()] loop.run_until_complete(asyncio.wait(tasks)) loop.close() #运行结果 True True a.x 0 b.x a a.x 1 b.x b a.x 2 b.x c
TCP Echo Server举例
import asyncio #TCP Echo Server举例 async def handle(reader,writer): while True: data = await reader.read(1024) print(dir(reader)) print(dir(writer)) client = writer.get_extra_info('peername') message = "{} Your msg {}".format(client,data.decode()).encode() writer.writer(message) await writer.drain() loop = asyncio.get_event_loop() ip = '127.0.0.1' port = 9999 crt = asyncio.start_server(handle,ip,port,loop=loop) server = loop.run_until_complete(crt) print(server)#server是监听的socket对象 try: loop.run_forever() except KeyboardInterrupt: pass finally: server.close() loop.close()
aiohttp库
HTTP Server
from aiohttp import web async def indexhandle(request:web.Request): return web.Request(text=request.path,status=201) async def handle(request:web.Request): print(request.match_info) print(request.query_string)#http://127.0.0.1:8080/1?name=12301 return web.Request(text=request.match_info.get('id','0000'),status=200) app = web.Application() app.router.add_get("/",indexhandle)#http://127.0.0.1:8080/ app.router.add_get("/{id}",handle)#http://127.0.0.1:8080/12301 web.run_app(app,host='0.0.0.0',port=9977)
HTTP Client
import asyncio from aiohttp import ClientSession async def get_html(url:str): async with ClientSession() as session: async with session.get(url) as res: print(res.status) print(await res.text()) url = 'http://127.0.0.1/ziroom-web' loop = asyncio.get_event_loop() loop.run_until_complete(get_html(url)) loop.close()