本文代码整理自:深入理解Python异步编程(上)
参考:A Web Crawler With asyncio Coroutines
一、同步阻塞方式
import socket def blocking_way(): sock = socket.socket() # blocking sock.connect(('example.com', 80)) request = 'GET / HTTP/1.0 Host: example.com ' sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk # blocking chunk = sock.recv(4096) return response def sync_way(): res = [] for i in range(10): res.append(blocking_way()) return len(res) def main(): start = time.time() print(sync_way()) print(time.time() - start) if __name__ == '__main__': import time main() # 5.15s
二、同步多线程方式
import socket from concurrent import futures def blocking_way(): sock = socket.socket() # blocking sock.connect(('example.com', 80)) request = 'GET / HTTP/1.0 Host: example.com ' sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk # blocking chunk = sock.recv(4096) return response def thread_way(): workers = 10 with futures.ThreadPoolExecutor(workers) as executor: futs = {executor.submit(blocking_way) for i in range(10)} return len([fut.result() for fut in futs]) def main(): start = time.time() print(thread_way()) print(time.time() - start) if __name__ == '__main__': import time main() # 0.52s
小提示
Python中的多线程因为GIL的存在,它们并不能利用CPU多核优势, 一个Python进程中,只允许有一个线程处于运行状态。 那为什么结果还是如预期,耗时缩减到了十分之一? 因为在做阻塞的系统调用时,例如sock.connect(),sock.recv()时,当前线程会释放GIL, 让别的线程有执行机会。但是单个线程内,在阻塞调用上还是阻塞的 Python中 time.sleep 是阻塞的,都知道使用它要谨慎, 但在多线程编程中,time.sleep 并不会阻塞其他线程。
三、非阻塞+回调(即异步非阻塞)方式
事件循环+回调 实现单线程内异步编程
事件监听
OS将I/O状态的变化都封装成了事件,如可读事件、可写事件。 并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。 让应用程序可以通过select注册文件描述符和回调函数。 当文件描述符的状态发生变化时,select 就调用事先注册的回调函数。 select因其算法效率比较低,后来改进成了poll; 再后来又有进一步改进,BSD内核改进成了kqueue模块,而Linux内核改进成了epoll模块。这四个模块的作用都相同,暴露给程序员使用的API也几乎一致, 区别在于kqueue 和 epoll 在处理大量文件描述符时效率更高。
selectors模块
Python标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。 DefaultSelector类会根据 OS 环境自动选择最佳的模块, 那在 Linux 2.5.44 及更新的版本上都是epoll了。
#!/usr/bin/python3.5 # encoding: utf-8 import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stopped = False urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} class Crawler: def __init__(self, url): self.url = url self.sock = None self.response = b'' def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('example.com', 80)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {0} HTTP/1.0 Host: example.com '.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped # 如果响应大于4KB,下一次循环会继续读 chunk = self.sock.recv(4096) if chunk: self.response += chunk else: selector.unregister(key.fd) urls_todo.remove(self.url) if not urls_todo: stopped = True # 事件循环 def loop(): while not stopped: # 阻塞, 直到一个事件发生 events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) if __name__ == '__main__': import time start = time.time() for url in urls_todo: crawler = Crawler(url) crawler.fetch() loop() print(time.time() - start) # 0.53s
回调层次过多的缺点:
- 共享状态管理困难
在回调的版本中,我们必须在Crawler实例化后的对象self里保存它自己的sock对象。 如果不是采用OOP的编程风格,那需要把要共享的状态接力似的传递给每一个回调。 多个异步调用之间,到底要共享哪些状态,事先就得考虑清楚,精心设计。
- 错误处理困难
一连串的回调构成一个完整的调用链; 如果其中一环抛了异常怎么办? 整个调用链断掉,接力传递的状态也会丢失,这种现象称为调用栈撕裂。 所以,为了防止栈撕裂,异常必须以数据的形式返回,而不是直接抛出异常, 然后每个回调中需要检查上次调用的返回值,以防错误吞没。
四、Python 对异步I/O的优化之路
#!/usr/bin/python3.5 # encoding: utf-8 import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stopped = False urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) class Crawler: def __init__(self, url): self.url = url self.response = b'' def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('example.com', 80)) except BlockingIOError: pass f = Future() def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) get = 'GET {0} HTTP/1.0 Host: example.com '.format(self.url) sock.send(get.encode('ascii')) global stopped while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) # 事件循环 def loop(): while not stopped: # 阻塞, 直到一个事件发生 events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() if __name__ == '__main__': import time start = time.time() for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) loop() print(time.time() - start) # 0.53s
在前辈的基础上做了一点更改:
#!/usr/bin/python3 # encoding: utf-8 import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stopped = False urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} class Future: def __init__(self): self.result = None self._callback = None # 原来是用列表来保存 def add_done_callback(self, fn): self._callback = fn def set_result(self, result): self.result = result # 因为只有一个对应的 Task.step()函数 if self._callback: self._callback(self) class Crawler: def __init__(self, url): self.url = url self.response = b'' def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('example.com', 80)) except BlockingIOError: pass f = Future() def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) get = 'GET {0} HTTP/1.0 Host: example.com '.format(self.url) sock.send(get.encode('ascii')) global stopped while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: urls_todo.remove(self.url) if not urls_todo: stopped = True break class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) print(next_future._callback) # 事件循环 def loop(): while not stopped: # 阻塞, 直到一个事件发生 events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() if __name__ == '__main__': import time start = time.time() c_list = [] for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) c_list.append(crawler) loop() # 增加了对爬取内容的输出 for crawler in c_list: print(crawler.response) print(time.time() - start)
五、用 yield from 改进生成器协程
yield
可以直接作用于普通Python对象,而yield from
却不行,
所以我们对Future
还要进一步改造,把它变成一个iterable
对象就可以了
#!/usr/bin/python3.5 # -*- coding:utf-8 -*- import socket from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ selector = DefaultSelector() stopped = False urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Crawler: def __init__(self, url): self.url = url self.response = b'' def fetch(self): global stopped sock = socket.socket() yield from connect(sock, ('example.com', 80)) get = 'GET {0} HTTP/1.0 Host: example.com '.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) urls_todo.remove(self.url) if not urls_todo: stopped = True class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: # send会进入到coro执行, 即fetch, 直到下次yield # next_future 为yield返回的对象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) # 事件循环 def loop(): while not stopped: # 阻塞, 直到一个事件发生 events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() if __name__ == '__main__': import time start = time.time() for url in urls_todo: crawler = Crawler(url) Task(crawler.fetch()) loop() print(time.time() - start) # 0.53s
六、asyncio和原生协程初体验
asyncio
是Python 3.4 试验性引入的异步I/O框架(PEP 3156),提供了基于协程做异步I/O编写单线程并发代码的基础设施。
其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。
在引入asyncio
的时候,还提供了一个装饰器@asyncio.coroutine
用于装饰使用了yield from
的函数,以标记其为协程。但并不强制使用这个装饰器。
在 3.5 中新增了async/await
语法(PEP 492),对协程有了明确而显式的支持,称之为原生协程。
async/await
和 yield from
这两种风格的协程底层复用共同的实现,而且相互兼容。
在Python 3.6 中asyncio
库“转正”,不再是实验性质的,成为标准库的正式一员。
#!/usr/bin/python3.5 # -*- coding:utf-8 -*- import asyncio import aiohttp host = 'http://example.com' urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'} loop = asyncio.get_event_loop() async def fetch(url): async with aiohttp.ClientSession(loop=loop) as session: async with session.get(url) as response: response = await response.read() return response if __name__ == '__main__': import time start = time.time() tasks = [fetch(host + url) for url in urls_todo] loop.run_until_complete(asyncio.gather(*tasks)) print(time.time() - start) # 0.54s
2019-06-26补充demo示例
1 import time 2 import asyncio 3 import requests 4 5 urls = [ 6 'http://httpbin.org/get', 7 'http://httpbin.org/ip', 8 'http://httpbin.org/json', 9 'http://httpbin.org/uuid', 10 'http://httpbin.org/user-agent', 11 'http://httpbin.org/headers', 12 'http://httpbin.org/response-headers', 13 ] 14 15 def get_result(url): 16 d = requests.get(url) 17 dd = d.json() 18 return dd 19 20 start = time.time() 21 22 results = [] 23 for url in urls: 24 d = get_result(url) 25 results.append(d) 26 27 print('RUN : {}'.format(time.time()-start)) 28 print(results)
耗时:RUN : 6.703306198120117
1 import time 2 import asyncio 3 import requests 4 5 urls = [ 6 'http://httpbin.org/get', 7 'http://httpbin.org/ip', 8 'http://httpbin.org/json', 9 'http://httpbin.org/uuid', 10 'http://httpbin.org/user-agent', 11 'http://httpbin.org/headers', 12 'http://httpbin.org/response-headers', 13 ] 14 15 def myfunc(url): 16 d = requests.get(url) 17 dd = d.json() 18 return dd 19 20 @asyncio.coroutine 21 def fetch_async(func, url): 22 loop = asyncio.get_event_loop() 23 future = loop.run_in_executor(None, func, url) 24 data = yield from future 25 return data 26 27 start = time.time() 28 loop = asyncio.get_event_loop() 29 tasks = [fetch_async(myfunc, url) for url in urls] 30 results = loop.run_until_complete(asyncio.gather(*tasks)) 31 loop.close() 32 33 print('RUN : {}'.format(time.time()-start)) 34 print(results)
耗时:RUN : 1.0276665687561035
补充说明:
run_in_executor(self, executor, func, *args) 第一个参数是传入一个executor(即concurrent.futures.ThreadPoolExecutor,线程池对象),
不传的话,默认使用 (os.cpu_count() or 1) * 5 这个数值,即如果是4核的cpu,就会对应生成一个含有20线程的线程池,来执行传入的第二个函数func.
所以run_in_executor其实开启了新的线程,再协调各个线程。