asyncio的核心概念与基本架构
本文针对的是python3.4以后的版本的,因为从3.4开始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不过语法上面有稍微的改变。比如在3.4版本中使用@asyncio.coroutine装饰器和yield from语句,但是在3.5以后的版本中使用async、await两个关键字代替,虽然语法上稍微有所差异,但是原理是一样的。
1 asyncio 组成的基本概念
1 协程函数的作用
(1)result = yield from future,返回future的结果。
(2)result = yield from coroutine,等候另一个协程函数返回结果或者是触发异常
(3)result= yield from task,返回一个task的结果
(4)return expression,作为一个函数抛出返回值
(5)raise exception
2 事件循环 event_loop
如何理解事件循环:
线程一直在各个协程方法之间永不停歇的游走,遇到一个yield from 或者await就悬挂起来,然后又走到另外一个方法,依次进行下去,知道事件循环所有的方法执行完毕。实际上loop是BaseEventLoop的一个实例,我们可以查看定义,它到底有哪些方法可调用
协程函数,不是像普通函数那样直接调用运行的,必须添加到事件循环中,然后由事件循环去运行,单独运行协程函数是不会有结果的。
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what) async def main(): print(f"开始时间为: {time.time()}") await say_after_time(1,"hello") await say_after_time(2,"world") print(f"结束时间为: {time.time()}") ''' 直接运行 ''' # >>> main() # <coroutine object main at 0x1053bb7c8> ''' 需要通过事件循环来调用''' loop=asyncio.get_event_loop() #创建事件循环对象 #loop=asyncio.new_event_loop() #与上面等价,创建新的事件循环 loop.run_until_complete(main()) #通过事件循环对象运行协程函数 loop.close()
(1)获取事件循环对象的几种方式:
-
loop=asyncio.get_running_loop()
,返回(获取)在当前线程中正在运行的事件循环,如果没有正在运行的事件循环,则会显示错误 -
loop=asyncio.get_event_loop()
,获得一个事件循环,如果当前线程还没有事件循环,则创建一个新的事件循环loop -
loop=asyncio.set_event_loop(loop)
, 设置一个事件循环为当前线程的事件循环; -
loop=asyncio.new_event_loop()
,创建一个新的事件循环
(2)通过事件循环运行协程函数的两种方式:
-
创建事件循环对象loop,即
asyncio.get_event_loop()
,通过事件循环运行协程函数 -
直接通过
asyncio.run(function_name)
运行协程函数。
但是需要注意的是,首先run函数是python3.7版本新添加的,前面的版本是没有的;其次,这个run函数总是会创建一个新的事件循环并在run结束之后关闭事件循环,所以,如果在同一个线程中已经有了一个事件循环,则不能再使用这个函数了,因为同一个线程不能有两个事件循环,而且这个run函数不能同时运行两次,因为他已经创建一个了。即同一个线程中是不允许有多个事件循环loop的。 asyncio.run()是python3.7 新添加的内容,也是后面推荐的运行任务的方式,因为它是高层API,后面会讲到它与asyncio.run_until_complete()的差异性,run_until_complete()是相对较低层的API。
3 什么是awaitable对象
有三类对象是可等待的,即 coroutines
, Tasks
, and Futures
.
coroutine
:本质上就是一个函数,一前面的生成器yield和yield from为基础,不再赘述;
Tasks
: 任务,顾名思义,就是要完成某件事情,其实就是对协程函数进一步的封装;
Future
:它是一个“更底层”的概念,他代表一个异步操作的最终结果,因为异步操作一般用于耗时操作,结果不会立即得到,会在“将来”得到异步运行的结果,故而命名为 Future。
三者的关系,coroutine
可以自动封装成 task
,而Task是 Future
的子类。
4 什么是task任务
Task用来 并发调度的协程, 单纯的协程函数仅仅是一个函数而已,将其包装成任务,任务是可以包含各种状态的,异步编程最重要的就是对异步操作状态的把控了。
(1)创建任务(两种方法):
方法一:task = asyncio.create_task(coro()) # 这是3.7版本新添加的
方法二:task = asyncio.ensure_future(coro())
,也可以使用loop.create_future()
,loop.create_task(coro)
也是可以的。
(2)获取某一个任务的方法:
方法一:task=asyncio.current_task(loop=None);返回在某一个指定的loop中,当前正在运行的任务,如果没有任务正在运行,则返回None;如果loop为None,则默认为在当前的事件循环中获取,
方法二:asyncio.all_tasks(loop=None);返回某一个loop中还没有结束的任务;
5 什么是future?
Future是一个较低层的可等待(awaitable)对象,他表示的是异步操作的最终结果,当一个Future对象被等待的时候,协程会一直等待,直到Future已经运算完毕。 Future是Task的父类,一般情况下,已不用去管它们两者的详细区别,也没有必要去用Future,用Task就可以了,返回 future 对象的低级函数的一个很好的例子是 loop.run_in_executor().
2 asyncio的基本架构
asyncio分为高层API和低层API。我们前面所讲的Coroutine和Tasks属于高层API,而Event Loop 和Future属于低层API。所谓的高层API主要是指那些asyncio.xxx()的方法。
High-level APIs
●Coroutines and Tasks(本文要写的) ●Streams ●Synchronization Primitives ●Subprocesses ●Queues ●Exceptions
Low-level APIs
●Event Loop(下一篇要写的) ●Futures ●Transports and Protocols ●Policies ●Platform Support
1)运行异步协程 asyncio.run(coro, *, debug=False) #运行一个一步程序,参见上面 2)创建任务 task=asyncio.create_task(coro) #python3.7 ,参见上面 task = asyncio.ensure_future(coro()) 3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 这个函数表示的是:当前的那个任务(协程函数)睡眠多长时间,而允许其他任务执行。这是它与time.sleep()的区别,time.sleep()是当前线程休息 4)并发运行多个任务 await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它本身也是awaitable的。当所有的任务都完成之后,返回的结果是一个列表的形式、 5)防止任务取消 await asyncio.shield(*arg, *, loop=None) 6)设置timeout await asyncio.wait_for(aw, timeout, *, loop=None) 当异步操作需要执行的时间超过waitfor设置的timeout,就会触发异常,所以在编写程序的时候,如果要给异步操作设置timeout,一定要选择合适,
如果异步操作本身的耗时较长,而你设置的timeout太短,会涉及到她还没做完,就抛出异常了。 7)多个协程函数时候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 与上面的区别是,第一个参数aws是一个集合,要写成集合set的形式,比如: {func(),func(),func3()} 表示的是一系列的协程函数或者是任务,其中协程会自动包装成任务。事实上,写成列表的形式也是可以的。 该函数的返回值是两个Tasks/Futures的集合: (done, pending) 其中done是一个集合,表示已经完成的任务tasks;pending也是一个集合,表示还没有完成的任务。 常见的使用方法为:done, pending = await asyncio.wait(aws)
2 Task 类详解
(1)他是作为一个python协程对象,和Future对象很像的这么一个对象,但不是线程安全的;他继承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高层API asyncio.create_task()创建任务,或者是使用低层API loop.create_task()或者是loop.ensure_future()创建任务对象;
(3)相比于协程函数,任务时有状态的,可以使用Task.cancel()进行取消,这会触发CancelledError异常,使用cancelled()检查是否取消。
cancel函数:
import asyncio async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模拟一个耗时任务 except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): #通过协程创建一个任务,需要注意的是,在创建任务的时候,就会跳入到异步开始执行 #因为是3.7版本,创建一个任务就相当于是运行了异步函数cancel_me task = asyncio.create_task(cancel_me()) #等待一秒钟 await asyncio.sleep(1) print('main函数休息完了') #发出取消任务的请求 task.cancel() try: await task #因为任务被取消,触发了异常 except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) '''运行结果为: cancel_me(): before sleep main函数休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now '''
3 异步函数的结果获取
两种方法:第一种是直接通过Task.result()来获取;第二种是绑定一个回调函数来获取,即函数执行完毕后调用一个函数来获取异步函数的返回值。
1,通过result函数
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b coroutine=hello1(10,5) loop = asyncio.get_event_loop() #第一步:创建事件循环 task=asyncio.ensure_future(coroutine) #第二步:将多个协程函数包装成任务列表 loop.run_until_complete(task) #第三步:通过事件循环运行 print('-------------------------------------') print(task.result()) loop.close() '''运行结果为 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''
2, 通过定义回调函数
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b def callback(future): #定义的回调函数 print(future.result()) loop = asyncio.get_event_loop() #第一步:创建事件循环 task=asyncio.ensure_future(hello1(10,5)) #第二步:将多个协程函数包装成任务 task.add_done_callback(callback) #并被任务绑定一个回调函数 loop.run_until_complete(task) #第三步:通过事件循环运行 loop.close() #第四步:关闭事件循环 '''运行结果为: Hello world 01 begin Hello again 01 end 15 '''
所谓的回调函数,就是指协程函数coroutine执行结束时候会调用回调函数。并通过参数future获取协程执行的结果。我们创建的task和回调里的future对象,实际上是同一个对象,因为task是future的子类。
import asyncio import time from functools import partial async def get_url(): print('start get url') await asyncio.sleep(2) # await 后面跟的必须是一个 await 对象 print('end get url') return 'stack' def test(url,future): print(url,'hello, stack') if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # loop.run_until_complete(get_url()) # 只是提交了一个请求,时间2s tasks = [get_url() for i in range(10)] # get_future = asyncio.ensure_future(get_url()) # 获得返回值用法1,源码上依然是先判断loop,然后调用create_task # get_future = loop.create_task(get_url()) # 方法2,还可以继续添加函数,执行逻辑 # get_future.add_done_callback(partial(test, 'Stack')) # 函数本身在获得调用时需要一个任意形数,参数即是 get_future 本身,否则报错 # 如果函数需要传递参数,需要通过 偏函数 partial 模块来解决,以及函数的形参需要放在前面 loop.run_until_complete(asyncio.wait(tasks)) # 提交了10次,时间也是2s # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上 # gather 和 wait 的区别 # gather是更高一级的抽象,且使用更加灵活,可以使用分组,以及取消任务 print(time.time() - start) # print(get_future.result()) # 接收返回值
针对3.7的版本
import asyncio import time async def hello1(a,b): print("Hello world 01 begin") await asyncio.sleep(3) #模拟耗时任务3秒 print("Hello again 01 end") return a+b async def hello2(a,b): print("Hello world 02 begin") await asyncio.sleep(2) #模拟耗时任务2秒 print("Hello again 02 end") return a-b async def hello3(a,b): print("Hello world 03 begin") await asyncio.sleep(4) #模拟耗时任务4秒 print("Hello again 03 end") return a*b async def main(): results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result) asyncio.run(main()) '''运行结果为: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
总结:
第一步:构建一个入口函数main 它也是一个异步协程函数,即通过async定义,并且要在main函数里面await一个或者是多个协程,同前面一样,我可以通过gather或者是wait进行组合,对于有返回值的协程函数,一般就在main里面进行结果的获取。
第二步:启动主函数main 这是python3.7新添加的函数,就一句话,即 asyncio.run(main())