1. 简介
说到asyncio就不得不说async、await,当一个函数前加了async之后就不在是一个函数了, 而是一个协程
2. 示例
两个协程平级并发调用
import asyncio # 协程 async def print_hello(): while True: print("hello world") await asyncio.sleep(1) # 暂停, cpu呗空出来 async def print_bye(): while True: print("good bye") await asyncio.sleep(2) # 暂停, cpu呗空出来 co1 = print_hello() # 创建一个协程对象 并没有执行 co2 = print_bye() # 创建一个协程对象 并没有执行 # 创建事件循环 loop = asyncio.get_event_loop() # epoll # loop.run_forever() # loop.run_until_complete(co) # 监听事件, 调度一个 loop.run_until_complete(asyncio.gather(co1, co2)) # 监听事件, 如果调度多个需要使用gather
两个协程嵌套调用
import asyncio import random import functools # 用于回调使用 def on_job_done(url, task): # 第二个是默认的task print("下载结束", url, task.result()) # task不仅仅有result(用于接收任务的返回值), 还有异常捕获等参数 async def crontab_scheduler(): page = 1 while True: url = '{}/{}'.format("https://www.baidu.com", page) await asyncio.sleep(1) job = cron_job(url) # await job # 错误,如果直接await 等待上一个job完成, 就会变成同步, 解决:直接交给Event loop,我们必须将新协程分离出去,让他和当前协程并发 task = asyncio.create_task(job) # 注册到事件循环, 但是并没有将协程让出去 task.add_done_callback(functools.partial(on_job_done, url)) # 完成回调 # future = asyncio.ensure_future(job) # 与上边的区别是可以拿到结果 其中文档中更建议task # future.add_done_callback() # await asyncio.sleep(0) # 主动让出线程 page +=1 async def cron_job(url): n = random.randint(1,3) await asyncio.sleep(n) print("下载结束", url) return "hhahaha" co1 = crontab_scheduler() # 创建一个协程对象 并没有执行 loop = asyncio.get_event_loop() loop.run_until_complete(co1) # asyncio.run(crontab_scheduler()) # run方法在python3.7之后才会出现
使用asyncio处理阻塞函数
使用事件循环对象的
run_in_executor
方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor
对象,我们可以调用run_in_executor
方法,把可调用对象发给它执行。
Loop.run_in_executor(executor, func, *args) # executor : Executor 实例。如果为 None,则使用默认 executor。 # func 就是要执行的函数 # *args 就是传递给 func 的参数 该方法返回一个协程
示例
版本一 使用async
import asyncio from time import sleep, strftime from concurrent import futures
executor = futures.ThreadPoolExecutor(max_workers=5) # 创建executor实例
async def blocked_sleep(name, t): print(strftime('[%H:%M:%S]'),end=' ') print('sleep {} is running {}s'.format(name, t)) loop = asyncio.get_event_loop() await loop.run_in_executor(executor, sleep, t) # 使用run_in_executor执行耗时方法 print(strftime('[%H:%M:%S]'),end=' ') print('sleep {} is end'.format(name)) return t
async def main(): future = (blocked_sleep(i, i) for i in range(1, 6)) fs = asyncio.gather(*future) return await fs
loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) print('results: {}'.format(results))
调用协程不会使协程中的代码运行,仅仅是创建了一个协程对象, 可以用一下两种方式运行协程
- 在另一个协程中调用 await coroutine 和 yield from coroutine (假定另一个协程已经在执行,即在事件循环中)
- 使用
ensure_future
函数或AbstractEventLoop.create_task
方法来排定执行时间。
版本二 使用coroutine装饰器
import asyncio from time import sleep, strftime from concurrent import futures def blocked(t): print(strftime('[%H:%M:%S]'),end=' ') print('{} sleep:{}s....'.format(t, t)) sleep(t) print(strftime('[%H:%M:%S]'),end=' ') print('{} finished'.format(t)) return t @asyncio.coroutine def main(): with futures.ThreadPoolExecutor(max_workers=5) as executor: loop = asyncio.get_event_loop() future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 6)] fs = asyncio.wait(future) # wait函数需要传入一个list,并且返回两组Futures,(done, pending) return (yield from fs) loop = asyncio.get_event_loop() results, _ = loop.run_until_complete(main()) print('results: {}'.format([result.result() for result in results]))
在第二份代码里,使用wait函数来等待任务结束,是为了记录一下不同的函数调用方法,和gather函数不同,wait函数需要传入一个list,并且返回两组Futures,(done, pending)。这就是为什么代码里使用 results, _ = loop.run_until_complete(main())
的原因了。
下面是一个使用asyncio.as_comleted
方法的例子,该方法返回一个协程迭代器。迭代时迭代器只返回已经完成的future。内部维护一个队列,每次迭代都从队列中返回已经完成的future的结果(result or exception),可以注意到在输出结果中,7秒后,所以任务才完成。因为executor大小设置为5,每次只有5个线程在跑,所以在第一个block运行结束后,我们可以看到第6个block立即执行。
import asyncio from time import sleep, strftime from concurrent import futures def blocked(t): print(strftime('[%H:%M:%S]'),end=' ') print('{} sleep:{}s....'.format(t, t)) sleep(t) print(strftime('[%H:%M:%S]'),end=' ') print('{} finished'.format(t)) return t @asyncio.coroutine def main(): with futures.ThreadPoolExecutor(max_workers=5) as executor: loop = asyncio.get_event_loop() future = [loop.run_in_executor(executor,blocked, i) for i in range(1, 7)] fs = asyncio.as_completed(future) results = [] for f in fs: result = yield from f results.append(result) return results loop = asyncio.get_event_loop() results= loop.run_until_complete(main()) print('results: {}'.format(results))
总结
在asyncio中调用阻塞函数时,需要使用asyncio维护的线程池来另开线程运行阻塞函数,防止阻塞事件循环所在的线程。
函数 | 传参 | 返回值 | 返回值顺序 | 函数意义 | |
---|---|---|---|---|---|
asyncio.gather |
可以传递多个协程或者Futures,函数会自动将协程包装成task,例如协程生成器。 | 包含Futures结果的list | 按照原始顺序排列 | 注重收集结果,等待一堆Futures并按照顺序返回结果 | |
asyncio.wait |
a list of futures | 返回两个Future集合 (done, pending) | 无序(暂定) | 是一个协程等传给他的所有协程都运行完之后结束,并不直接返回结果 | |
asyncio.as_completed |
a list of futures | 返回一个协程迭代器 | 按照完成顺序 | 返回的迭代器每次迭代只返回已经完成的Futures |