协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行 , 那么在我们的代码中如何写呢? |
|
以下代码段(需要 Python 3.7+)会打印 "1",等待 2 秒,再打印 "2" |
|
```python |
import asyncio |
|
async def main(): |
print(1) |
await asyncio.sleep(2) |
print(2) |
|
asyncio.run(main()) |
``` |
|
注意:当在函数定义前面加上async 关键字 , 此时的函数不在是一个普通函数 , 而是一个协程函数 , 当函数名+ |
|
括号不再是直接调用函数 , 执行函数体代码 , 而是返回一个协程对象 , 执行协程函数创建协程对象,函数内部代 |
|
码不会执行。简单地说调用一个协程并不会使其被调度执行 |
|
```python |
print(main()) # <coroutine object main at 0x0000027A2A118440> |
``` |
|
那么要真正运行一个协程函数 ,asyncio 提供了三种主要机制 : |
|
- asyncio.run() 函数用来运行最高层级的入口点 "main()" 函数 |
|
- 等待一个协程 以下代码段会在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world": |
|
```python |
import asyncio |
import time |
|
async def say_after(delay, what): |
await asyncio.sleep(delay) |
print(what) |
|
async def main(): |
print(f"started at {time.strftime('%X')}") |
|
await say_after(1, 'hello') |
await say_after(2, 'world') |
|
print(f"finished at {time.strftime('%X')}") |
|
asyncio.run(main()) |
""" |
比如我想运行 say_after() 协程函数 , 我们可以在另一个协程函数里面 await 它 , 然后在外面 |
asyncio.run(main()) |
""" |
``` |
|
- asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程函数 |
|
```python |
async def main(): |
task1 = asyncio.create_task( |
say_after(1, 'hello')) |
|
task2 = asyncio.create_task( |
say_after(2, 'world')) |
|
print("started ") |
|
await task1 |
await task2 |
|
print("finished") |
|
asyncio.run(main()) |
""" |
更上面相比是把协程对象(协程函数+())创建成了task对象 , 然后await 它 |
""" |
``` |
|
## 2.可等待对象 |
|
什么是可等待对象 ? |
|
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象 |
|
可等待对象有三种主要类型: 协程对象, task对象 和 Future对象 --> io操作 |
|
如果你对可等待对象还是不好理解 , 或者说你不知道在什么情况下要在前面加await , 记住编写这类代码 , 刚开始 |
|
一定要先写对象 , 然后再考虑这个对象会不会有io操作 , 要不要加await |
|
### 2.1协程对象 |
|
协程对象属于可等待对象,因此可以在其他协程中被等待: |
|
```python |
import asyncio |
|
async def nested(): |
return 42 |
|
async def main(): |
# Nothing happens if we just call "nested()". |
# A coroutine object is created but not awaited, |
# so it won't run at all. |
nested() |
|
# Let's do it differently now and await it: |
res = await nested() # res就是协程对象等待到的返回值 |
print(res) # will print "42". |
|
asyncio.run(main()) |
``` |
|
注意 : |
|
``` |
协程函数 : 定义形式为 async def 的函数; |
协程对象 : 调用 协程函数 所返回的对象。 |
``` |
|
### 2.2task对象 |
|
task翻译过来是任务 , 任务是用来并行的调度协程的 |
|
当一个协程函数通过 asyncio.create_task() 等函数被封装为一个 任务,该协程函数会被自动调度执行 |
|
```python |
import asyncio |
|
async def nested(): |
return 42 |
|
async def main(): |
# 简单的理解为把协程对象打包成task对象 |
task = asyncio.create_task(nested()) |
|
# 等待任务调度执行协程函数 |
await task |
|
asyncio.run(main()) |
``` |
|
### 2.3future对象 |
|
Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。 |
|
当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。 |
|
Task继承Future,Task对象内部await结果的处理基于Future对象来的。 |
|
示例1: |
|
```python |
async def main(): |
# 获取当前事件循环 |
loop = asyncio.get_running_loop() |
|
# 创建一个任务(Future对象),这个任务什么都不干。 |
fut = loop.create_future() |
|
# 等待任务最终结果(Future对象),没有结果则会一直等下去。 |
await fut |
|
asyncio.run( main() ) |
``` |
|
示例2: |
|
```python |
import asyncioasync def set_after(fut): await asyncio.sleep(2) fut.set_result("666")async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束。 fut = loop.create_future() # 创建一个任务(Task对象),绑定了set_after函数,函数内部在2s之后,会给fut赋值。 # 即手动设置future任务的最终结果,那么fut就可以结束了。 await loop.create_task( set_after(fut) ) # 等待 Future对象获取 最终结果,否则一直等下去 data = await fut print(data)asyncio.run( main() ) |
``` |
|
## 3.asyncio.run方法 |
|
参数 |
|
```python |
asyncio.run(coro, *, debug=False) # 执行 coro 并返回结果。coro指的就是协程对象run函数会运行传入的协程函数,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用如果 debug 为 True,事件循环将以调试模式运行。此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。 |
``` |
|
示例 |
|
```python |
async def main(): await asyncio.sleep(1) print('hello')asyncio.run(main()) |
``` |
|
## 4.asyncio.create_task方法 |
|
参数 |
|
```python |
asyncio.create_task(coro, *, name=None) # 将 coro 协程对象封装为一个Task并调度其执行。返回 Task 对象coro是协程对象 , name是task名称# task.set_name() 设置任务名称# task.get_name() 获取任务名称 |
``` |
|
注意:asyncio.create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。 |
|
```python |
import asyncioasync def coro(): return 123async def main(): task = asyncio.create_task(coro()) await task task.set_name('哈哈哈') print(task.get_name())asyncio.run(main()) |
``` |
|
## 5.并发运行任务 |
|
在asyncio模块下 , 有两种方法可以实现并发运行任务 , 你只需要掌握如何编写代码 , 内部的调度模块已经全部给你搞好了 |
|
### 5.1gather方法 |
|
```python |
asyncio.gather(*aws, return_exceptions=False) # 并发运行 aws 序列中的 可等待对象。如果 aws 中的某个可等待对象为协程对象,它将自动被作为一个任务调度。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消并将继续运行 , 知道运行完毕 |
``` |
|
示例代码 |
|
```python |
import asyncioasync def func(name, x): print(f"我是协程: {name}") await asyncio.sleep(x) print(f"我是协程: {name} 等待结束") res = x*x return resasync def main(): L = await asyncio.gather( func("A", 2), func("B", 1), func("C", 4), ) print(L)asyncio.run(main())"""我是协程: A我是协程: B我是协程: C我是协程: B 等待结束我是协程: A 等待结束我是协程: C 等待结束[4, 1, 16]""" |
``` |
|
### 5.2wait方法 |
|
参数 |
|
```python |
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)# 并发地运行 aws 可迭代对象中的 可等待对象 并进入阻塞状态直到满足 return_when 所指定的条件aws 可迭代对象必须不为空。返回两个集合: (done(完成的任务), pending(未完成的任务))done, pending = await asyncio.wait(aws) |
``` |
|
return_when 指定此函数(wait函数)应在何时返回。它必须为以下常数之一: |
|
|
|
|
|
|
|
注意 : python3.8之前允许将协程对象直接传进awit函数里面 , 但是在python3.8之后就废弃了,python3.11将会彻底移除 , 必须创建成task对象才可以 |
|
```python |
import asyncioasync def func(): return 123async def main(): task1= asyncio.create_task(func()) task2 = asyncio.create_task(func()) task3= asyncio.create_task(func()) done, pending = await asyncio.wait([task1,task2,task3]) print(done, pending)asyncio.run(main()) |
``` |
|
区别 : |
|
asyncio.wait 比asyncio.gather 更低级。 |
|
asyncio.gather 主要集中在收集结果,它等待并按给定的顺序返回其结果。 |
|
asyncio.wait 只是等待。它不是直接给出结果,而是给出已完成和挂起的任务。 |
|
所以就是看你的需求 , 以及在设计代码的时候需不需要收集每个任务的结果 |
|
## 6.爬虫任务实现并发运行案例 |
|
```python |
import aiohttpimport asyncioimport timeasync def fetch( url): print("发送请求:", url) async with aiohttp.ClientSession() as session: async with session.get(url) as response: text = await response.text() print("得到结果:", url, len(text)) return textasync def main(): url_list = [ 'https://python.org', 'https://www.baidu.com', 'https://www.pythonav.com' ] tasks = [asyncio.create_task(fetch(url)) for url in url_list] l = await asyncio.gather(*tasks) for el in l: print(len(el))if name == 'main': start_time = time.time() asyncio.run(main()) print(time.time() - start_time) |
``` |
|
如果遇到win平台报错 : RuntimeError: Event loop is closed , linux和mac不会报错 , 解决方案 : |
|
```python |
import selectors.....# main 使用loop.create_task(job(session))创建task对象selector = selectors.SelectSelector() loop = asyncio.SelectorEventLoop(selector) try: loop.run_until_complete(main(loop)) # 完成事件循环,直到最后一个任务结束finally: loop.close() # 结束事件循环.print("Async total time:", time.time() - t2) |
``` |
|
为协程对象添加回调函数 |
|
```python |
import aiohttpimport asyncioimport timeasync def fetch(url): print("发送请求:", url) async with aiohttp.ClientSession() as session: async with session.get(url, timeout=3) as response: text = await response.text() return textdef call_back(obj): # obj是协程对象,result()是返回值 text = obj.result() print(len(text))async def main(): url_list = [ 'https://www.jd.com', 'https://www.baidu.com', 'https://www.pythonav.com' ] tasks = [] for url in url_list: task = asyncio.create_task(fetch(url)) task.add_done_callback(call_back) tasks.append(task) await asyncio.wait(tasks)if name == 'main': start_time = time.time() asyncio.run(main()) print(time.time() - start_time) |
``` |