asynchronous(异步)/concurrent(并发)/multiprocess(多进程) in Python
相关模块有:
_thread
threading (对_thread的高级封装)
multiprocessing
concurrent.futures(多threading/multiprocessing的高级封装)
asyncio
asyncio
参考资料:
Official Document
廖雪峰的官方网站
Guide to Concurrency in Python with Asyncio
Python黑魔法 — 异步IO( asyncio) 协程
且听风吟
小明
理解 Python asyncio Python Generator
asyncio is a library to write concurrent(并发) code using the async/await syntax.
asyncio is used as a foundation for multiple Python asynchronous(异步) frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
asyncio模块有四个基本概念:
eventloop
coroutine
future
task
Python3.4 增加了asyncio模块支持异步IO,起初使用@asyncio.coroutine || yield from语法,Python3.5后使用async || await语法替代。
异步IO asynchronous|| 并发 concurrency || 并行 parallel1
所谓异步IO,就是当发起一个IO操作,却不等它结束,直接去做其他事情,当它结束时,会通知你。
同步IO模型的代码无法实现异步IO模型。异步IO模型需要有一个eventloop,在eventloop中主线程不断重复“读取消息–> 处理消息”这一过程。
所谓并发(concurrency),是指同一时刻平等的运行不同事情。多进程、多线程都可以实现。是一种逻辑上的假相。不同任务逻辑上同时运行,但处理器只有一个,同一时刻处理器上只能处理一个任务。核心是交替,逻辑上。
所谓并行(parallel),是真正的同时运行,同一时刻多个任务在多个处理器上同时执行。核心是并列,物理上。
并发指的是程序的结构,并行指的是程序运行时的状态。
概念一:事件循环 event loop
功能类似于CPU(顺序执行协程代码)、操作系统(完成协程调度)。
作为中央总控,eventloop实例提供了注册/取消/执行任务(和回调)的方法。
把一些列异步函数注册到事件循环上,事件循环就会循环执行这些函数(同一时刻只能执行一个),当执行到某个函数正在等待I/O返回,事件循环就会暂停它的执行去执行其他函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。
eventloop机理上,遇到IO操作时,代码只发出IO请求,不等待IO结果,直接结束本轮消息处理,进入下一轮消息处理过程。
当IO操作完成后,收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。
在“发出IO请求”到收到“IO完成”这段时间内,主线程继续处理eventloop中其他消息,如此异步IO模型下,一个线程就可以同时处理多个IO请求,而不用产生线程切换的消耗。
The eventloop is what schedules and runs asynchronous tasks.
uvloop
According to the authors of uvloop, it is comparible in speed to that of Go programs.
If you want to utilize uvloop the first install it ,the add a call to uvloop.install()
import asyncio
import uvloop
async def foo():
print("Foo!")
async def hello_world():
await foo()
print("Hello World!")
uvloop.install() # 使用uvloop替换原生eventloop
asyncio.run(hello_world())
1
2
3
4
5
6
概念二:协程 coroutine
coroutine本质上是一个函数,在代码块中可以由用户将执行权交给其他协程。
协程可以做哪些事2:
等待一个future结束
等待另一个协程(产生一个结果,或引发一个异常) await asyncio.sleep(x)
产生一个结果给正在等它的协程
引发一个异常给正在等它的协程
function or object
coroutine function : an async def function;
coroutine object : an object returned by calling a coroutine function
协程的运行
调用协程函数,协程并不会开始运行,只是返回一个协程对象。
运行协程对象的两种方式:
在另一个已经运行的协程中用“await”等待它
通过“ensure_future”函数计划它的执行
本质上,只有loop运行了,协程才会运行。
import asyncio
async def do_some_work(x): # async 用于定义一个coroutine对象
print("waiting " + str(x))
await asyncio.sleep(x) # await调用另一个coroutine对象
print("again") # asycnio.sleep()相当于IO,这里并不是等待IO结束,而是直接中断do_some_work()这个coroutine,当IO完成后才重新启动本协程
loop = asyncio.get_event_loop() # 定义一个无协程的loop
loop.run_until_complete(do_some_work(3)) # 把协程对象加入到loop,此处参数应为future对象,只是内部可以自动检测封装
loop.run_until_complete(asyncio.ensure_future(do_some_work(3))) # 正规写法
1
2
run_until_complete只接收单个future,如果有多个协程需要用syncio.gather([coroutine1,c2,…]),asyncio.gather()本质是并发运行任务.
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# 等价于
asyncio.run(main())
1
2
3
4
5
The .run function always creates a new event loop and closes it at the end. It’s a high-level API.
协程的并发:asyncio.gather || asyncio.wait
return_a, return_b = await asyncio.gather(a(), b())
done, pending = await asyncio.wait([a(), b()])
# return_a 等价于list(done)[0].result()
1
2
3
两者都是让多个协程并发执行,asyncio.gather()会按照输入协程的顺序保存对应协程的执行结果。
asyncio.wait()返回封装的task(已完成的和被挂起的任务),如果需要协程的执行结果就要从对应task实例中采用result方法拿到。
asyncio.wait()接收一个参数return_when,表示返回的时机。可选有:
ALL_COMPLETED : 全部任务完成后返回
FIRST_COMPLETED : 第一个协程完成后返回
FIRST_EXCEPTION : 第一个异常出现后返回
协程与函数
在函数世界内,当一个函数中断去执行另一个函数采用调用另一个函数;协程中,函数A执行中断去执行函数B是由事件循环操作的,并不涉及函数之间的调用。
不同协程是在同一个线程内切换的。
概念三:Future
future是coroutine的封装,future对象提供了很多任务方法,可以向Future实例添加完成后的回调(add_done_callback)、取消任务(cancle)、设置最终结果(set_result)、设置一场(set_exception)等。
概念四:Task
task是future这种底层类的子类。实际使用中更过直接调用task而非future。
Task的创建有三种方法:
asyncio.create_task(coro) Python3.7后推荐用法,本质是对loop.create_task的封装
loop.create_task(coro)
asyncio,ensure_future(coro/future/awaitable)
Awaitables
Something is awaitable if it can be used in an await expression.
There are three main tyoes of awaitables:
Coroutine
Tasks
Futures(low-level type)
回调
可以给Task/Future添加回调函数,等task完成后就会自动调用这些回调函数。
task.add_done_callback(callback)
回调函数按其注册顺序被调用。
同步代码
如果有同步逻辑需要执行,可以放在loop.run_in_executor(concurrent.futures.Executor实例,同步函数)里执行。
References