书中的代码用的应该是Python3.3的,从Python3.5开始协程用async 与await 代替了@asyncio.coroutine与yield.from
话说asyncio与aiohttp配合使用,从书中的教程来看真的非常强大,以后的并发io处理,协程才是王道了。
18.1线程与协程对比
import threading import itertools import time import sys class Signal: go = True def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() write('x08' * len(status)) time.sleep(.1) if not signal.go: break write(' ' * len(status) + 'x08' * len(status)) def slow_function(): time.sleep(3) return 42 def supervisor(): singal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', singal)) print('spinner object:', spinner) # spinner线程开始 spinner.start() # 主线程休眠 result = slow_function() # 给spinner线程发送关闭信号 singal.go = False spinner.join() return result def main(): result = supervisor() print('Answer', result) if __name__ == '__main__': main()
这个一个多线程的显示脚本,一共两个线程,主线程负责显示最后的输出,开辟的子线程负责显示转圈圈。
import asyncio import itertools import sys async def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\'): status = char + ' ' + msg write(status) flush() # 继续回到初始位置 write('x08' * len(status)) try: await asyncio.sleep(.1) # 捕获错误 except asyncio.CancelledError: break write(' ' * len(status) + 'x08' * len(status)) async def slow_function(): await asyncio.sleep(3) return 42 async def supervisor(): # 把协程包装成为一个task任务 spinner = asyncio.ensure_future(spin('thinking!')) # 已经是一个任务返回的还是一个任务 spinner = asyncio.ensure_future(spinner) print('spinner object:', spinner) # 激活等待slow_function任务,由于slow_function里面有sleep,会把控制权,到时候会把控制权转给spinnner slow_function1 = asyncio.ensure_future(slow_function()) result = await slow_function1 # 得到result,取消spinner任务 # await asyncio.sleep(5) spinner.cancel() return result def main(): loop = asyncio.get_event_loop() result = loop.run_until_complete(supervisor()) loop.close() print('Answer', result) if __name__ == '__main__': main()
这是一个协程版本的,最后的协程supervisor()里面一共有两个任务,通过await asyncio.sleep()切换协程的工作。
asyncio.ensure_future(...)接收的是一个协程,排定它的运作时间,排定它的运行时间,然后返回个asyncio.Task实例,也就是asyncio.Future的实例,因为Task
是Future的子类,用于包装协程。
task或者future都有.done(), .add_done_callback(....)和.result()等方法,只不过这些方法一般用的比较少,只要result=await myfuture(),其中await后面需要回调的参数就是,result就是task的result。
无需调用my_future.add_done_callback(...),因为可以直接把想在future运行结束后执行的操作放在协程中 await my_futre表达式的后面。这个是协程的一大优势:协程是可以暂停和恢复函数的。
无需调用my_future.result(),因为await 从 future中产出的值就是结果。(列如,result = await my_future)。
18.2使用asyncio和aiohttp包下载
重点记录:在一个asyncio中,基本的流程是一样的:在一个单线程程序中使用主循环一次激活队列里的携程。各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列里的下一个协程。
import asyncio import aiohttp from flags import BASE_URL, save_flag, show, main async def get_flag(cc): url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower()) async with aiohttp.ClientSession() as session: # 运行底层库函数session.get(url) async with session.get(url) as resp: # print(type(resp)) # resp.read()是一个协程,必须通过await获取响应内容 image = await resp.read() return image async def download_one(cc): # 管道传递给下一个协程函数 image = await get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') return cc def download_many(cc_list): # 创建事件循环 loop = asyncio.get_event_loop() to_do = [download_one(cc) for cc in sorted(cc_list)] # print(type(to_do[0])) wait_coro = asyncio.wait(to_do) # 运行wait_coro里面的task任务 res, _ = loop.run_until_complete(wait_coro) # print(res , _) loop.close() return len(res) if __name__ == '__main__': main(download_many)
一个简单版本的协程下载国旗脚本。
书中最后的重点说到,我们使用asyncio包时,由asyncio包实现的事件循环去做,比如asyncio包API的某个函数(如loop.run_until_complete(...))
我们编写的协程链条最终通过await 把职责委托给asyncio包中的某个协程函数或协程方法(比如await asyncio.sleep(...),或者其他库中实现高层协议的协程(比如resp = await resp.read())
也就是说,最内层的子生成器是库中真正执行I/O操作的函数,而不是我们编写的函数。(言下之意,就是I/O操作必须是高层协议提供的协程)
概括起来就是:使用asyncio包时,我们编写的异步代码中包含由asyncio本身驱动的协程(即委派生成器),而生成器最终把职责委托给asyncio包或第三方库(如aiohttp)中的协程。
这种方式相当于架起了管道,让asyncio事件循环(通过我们编写的协程)驱动执行底层异步I/O操作的库函数。
18.3避免阻塞性调用。
有两种方法能避免阻塞调用终止整个应用程序的进程:
1、在单独的线程中运行呵呵阻塞型操作。
2、把每个阻塞型操作转换成非阻塞的异步操作。
18.4改进asyncio下载版本。
可以接收错误,并且在保存图片的时候,也采取了异步的措施,但实际操作中,异步保存图片文件过大会报错,不知道为什么。
import asyncio import collections import aiohttp from aiohttp import web import tqdm from flags2_common import main, HTTPStatus, Result, save_flag # 默认设为较小的值,防止远程网络出错 DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 class FetchError(Exception): def __init__(self, country_code): # 继承父类的args添加属性,pythoncookbook书中介绍最好这样写 super(FetchError, self).__init__(country_code) self.country_code = country_code async def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) async with aiohttp.ClientSession() as session: # 运行底层库函数session.get(url) async with session.get(url) as resp: # 如果响应正常 if resp.status == 200: image = await resp.read() return image # 404 上报内置的定点错误 elif resp.status == 404: raise web.HTTPNotFound # 其他错误收集错误码信息上报 else: raise aiohttp.ClientHttpProxyError( code=resp.status, message=resp.reason, headers=resp.headers ) async def down_load_one(cc, base_url, semaphore, verbose): # 404与正常下载返回对象关系,其他上浮错误给调用者 try: # 在标记的最大协程数量下工作 with (await semaphore): image = await get_flag(base_url, cc) # 抓取上浮的404的错误 except web.HTTPNotFound: # 用Enum关系表达式保存status status = HTTPStatus.not_found msg = 'not found' # 其它引起的报错为基础错误Exception,上浮给调用者 except Exception as exc: raise FetchError(cc) from exc else: # 正常情况下,保存图片 # run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认为ThreadPoolExecutor loop = asyncio.get_event_loop() loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif') # save_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' if verbose and msg: print(cc, msg) # 返回namedtuple对象 return Result(status, cc) async def downloader_coro(cc_list, base_url, verbose, concur_req): # 计数器 counter = collections.Counter() # 标记协程最大对象 semaphore = asyncio.Semaphore(concur_req) # 协程列表 to_do = [ down_load_one(cc, base_url, semaphore, verbose) for cc in cc_list ] # 完成的工作任务 to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) for future in to_do_iter: try: res = await future # 获取调用者的错误信息 except FetchError as exc: country_code = exc.country_code try: # 尝试获取引起错误的实例的参数args error_msg = exc.__cause__.args[0] except IndexError: # 如果取不到索引获取引起错误的实例的类的名字 error_msg = exc.__cause__.__class__.__name if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) # 其他的错误类型用Enum的最后一种关系表达式 status = HTTPStatus.error else: status = res.status # 统计各种状态 counter[status] += 1 return counter def download_many(cc_list, base_url, verbose, concur_req): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list, base_url, verbose, concur_req) counts = loop.run_until_complete(coro) loop.close() # print(counts) # 返回运行完成后的统计字典 return counts if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
这个代码我只能看懂,逻辑实在是紧密,让我学到了很多。
18.5 从回调到future和协程
使用await结果做异步编程,无需使用回调。能够避免回调函数的回调地狱。
在我的实际使用中,如果多个协程进行逻辑上面从上到下的运行,且存在上下文关系,中间不能插入普通函数参与上下文的工作中,因为在协程的任务运行期间,普通函数根本没有机会获得控制权,也就没有机会运行。
import asyncio import collections import aiohttp from aiohttp import web import tqdm from flags2_common import main, HTTPStatus, Result, save_flag # 默认设为较小的值,防止远程网络出错 DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 class FetchError(Exception): def __init__(self, country_code): # 继承父类的args添加属性,pythoncookbook书中介绍最好这样写 super(FetchError, self).__init__(country_code) self.country_code = country_code async def http_get(url): async with aiohttp.ClientSession() as session: # 运行底层库函数session.get(url) async with session.get(url) as resp: if resp.status == 200: # 获取数据的类型 ctype = resp.headers.get('Content-type','').lower() # print(ctype) # 如果json在请求头或者url尾巴里面,发挥json数据 if 'json' in ctype or url.endswith('json'): data = await resp.json() # print(data) else: data = await resp.read() # print(data) return data elif resp.status == 404: return web.HTTPNotFound else: raise aiohttp.ClientHttpProxyError( code=resp.status, message=resp.reason, headers=resp.headers ) async def get_country(base_url, cc): url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower()) # print(url) 返回国家的信息 metadata = await http_get(url) return metadata['country'] async def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) # print(url) return (await http_get(url)) async def down_load_one(cc, base_url, semaphore, verbose): # 404与正常下载返回对象关系,其他上浮错误给调用者 try: # 在标记的最大协程数量下工作 async with semaphore: image = await get_flag(base_url, cc) # print(image) async with semaphore: country = await get_country(base_url, cc) # 抓取上浮的404的错误 except web.HTTPNotFound: # 用Enum关系表达式保存status status = HTTPStatus.not_found msg = 'not found' # 其它引起的报错为基础错误Exception,上浮给调用者 except Exception as exc: raise FetchError(cc) from exc else: # 正常情况下,保存图片 # run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认为ThreadPoolExecutor country = country.replace(' ','_') filename = '{}-{}.gif'.format(country, cc) loop = asyncio.get_event_loop() loop.run_in_executor(None, save_flag, image, filename) # save_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' if verbose and msg: print(cc, msg) # 返回namedtuple对象 return Result(status, cc) async def downloader_coro(cc_list, base_url, verbose, concur_req): # 计数器 counter = collections.Counter() # 标记协程最大对象 semaphore = asyncio.Semaphore(concur_req) # 协程列表 to_do = [ down_load_one(cc, base_url, semaphore, verbose) for cc in cc_list ] # 完成的工作任务 to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) for future in to_do_iter: try: res = await future # 获取调用者的错误信息 except FetchError as exc: country_code = exc.country_code try: # 尝试获取引起错误的实例的参数args error_msg = exc.__cause__.args[0] except IndexError: # 如果取不到索引获取引起错误的实例的类的名字 error_msg = exc.__cause__.__class__.__name if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) # 其他的错误类型用Enum的最后一种关系表达式 status = HTTPStatus.error else: status = res.status # 统计各种状态 counter[status] += 1 return counter def download_many(cc_list, base_url, verbose, concur_req): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list, base_url, verbose, concur_req) counts = loop.run_until_complete(coro) loop.close() # print(counts) # 返回运行完成后的统计字典 return counts if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
上面是按照书上的写法,自己做了一些标识,这里面最终调用的最底层运行异步的函数就是
import asyncio import collections import aiohttp from aiohttp import web import tqdm from flags2_common import main, HTTPStatus, Result, save_flag # 默认设为较小的值,防止远程网络出错 DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 class FetchError(Exception): def __init__(self, country_code): # 继承父类的args添加属性,pythoncookbook书中介绍最好这样写 super(FetchError, self).__init__(country_code) self.country_code = country_code async def http_get(url): async with aiohttp.ClientSession() as session: # 运行底层库函数session.get(url) async with session.get(url) as resp: if resp.status == 200: ctype = resp.headers.get('Content-type','').lower() # 如果是json返回JSON数据 if 'json' in ctype or url.endswith('json'): data = await resp.json() # print(data) else: data = await resp.read() # print(data) return data elif resp.status == 404: return web.HTTPNotFound else: raise aiohttp.ClientHttpProxyError( code=resp.status, message=resp.reason, headers=resp.headers ) async def get_country(base_url, cc): url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower()) # print(url) 返回国家的信息 metadata = await http_get(url) return metadata['country'] async def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) # print(url) return (await http_get(url)) async def down_load_one(cc, base_url, semaphore, verbose): # 404与正常下载返回对象关系,其他上浮错误给调用者 try: # 在标记的最大协程数量下工作 async with semaphore: image = await get_flag(base_url, cc) # print(image) async with semaphore: country = await get_country(base_url, cc) # 抓取上浮的404的错误 except web.HTTPNotFound: # 用Enum关系表达式保存status status = HTTPStatus.not_found msg = 'not found' # 其它引起的报错为基础错误Exception,上浮给调用者 except Exception as exc: raise FetchError(cc) from exc else: # 正常情况下,保存图片 # run_in_executor方法的第一个参数是Executor实例;如果设为None,使用事件循环的默认为ThreadPoolExecutor country = country.replace(' ','_') filename = '{}-{}.gif'.format(country, cc) loop = asyncio.get_event_loop() loop.run_in_executor(None, save_flag, image, filename) # save_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' if verbose and msg: print(cc, msg) # 返回namedtuple对象 return Result(status, cc) async def downloader_coro(cc_list, base_url, verbose, concur_req): # 计数器 counter = collections.Counter() # 标记协程最大对象 semaphore = asyncio.Semaphore(concur_req) # 协程列表 to_do = [ down_load_one(cc, base_url, semaphore, verbose) for cc in cc_list ] # 完成的工作任务 to_do_iter = asyncio.as_completed(to_do) if not verbose: to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) for future in to_do_iter: try: res = await future # 获取调用者的错误信息 except FetchError as exc: country_code = exc.country_code try: # 尝试获取引起错误的实例的参数args error_msg = exc.__cause__.args[0] except IndexError: # 如果取不到索引获取引起错误的实例的类的名字 error_msg = exc.__cause__.__class__.__name if verbose and error_msg: msg = '*** Error for {}: {}' print(msg.format(country_code, error_msg)) # 其他的错误类型用Enum的最后一种关系表达式 status = HTTPStatus.error else: status = res.status # 统计各种状态 counter[status] += 1 return counter def download_many(cc_list, base_url, verbose, concur_req): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list, base_url, verbose, concur_req) counts = loop.run_until_complete(coro) loop.close() # print(counts) # 返回运行完成后的统计字典 return counts if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
从代码中可以看到,在单个下载down_load_one协程中,添加了两个协程任务。
但最终实现I/O操作的还是
async with aiohttp.ClientSession() as session:
# 运行底层库函数session.get(url)
async with session.get(url) as resp
18.6使用sayncno包编写服务器。
这个能力有限,理解起来太累,回头有空看了再写。