协程&异步编程&asyncio
1.项目示例
1.1在python3.7 之前的版本
主要为
asyncio.ensure_future() 实例化task
使用
loop = asyncio.get_event_loop()
done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))
来运行
import asyncio
async def run():
print("hello")
await asyncio.sleep(5)
print("world")
async def run1():
print("hello")
await asyncio.sleep(3)
print("world")
# 创建并发
tasks = [asyncio.ensure_future(run()), asyncio.ensure_future(run1())]
# 创建应用池
loop = asyncio.get_event_loop()
# done 完成的
# deping 未完成的
# timeout 最长等带时间
done, deping = loop.run_until_complete(asyncio.wait(tasks, timeout=4))
print("done:", done)
print("deping:", deping)
hello hello world done: {<Task finished name='Task-2' coro=<run1() done, defined at c:UsersAdministratorDesktopexam协程旧的版本.py:8> result=None>} deping: {<Task pending name='Task-1' coro=<run() running at c:UsersAdministratorDesktopexam协程旧的版本.py:5> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001EF104A8040>()]>>}
1.2.在python3.7 之后
将 手动创建 task 和 运行方式进行了修改
使用: async.create_task() 来创建task 实例
使用 : asyncio.run() 来运行
示例:
import asyncio
async def run():
print("hello")
await asyncio.sleep(2)
print("world")
return 1
async def run1():
print("hello")
await asyncio.sleep(5)
print("world")
# 主函数
async def main():
# 创建task, name 任务名称
tasks = [ asyncio.create_task(run(), name="n1"),
asyncio.create_task(run(), name="n2")]
# done 已完成的
# pending 未完成的
# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
done, pending = await asyncio.wait(tasks, timeout=None)
return done, pending
# 运行
done, pending = asyncio.run( main() )
print("done:", done)
print("pending:", pending)
C:UsersAdministratorDesktopexam>C:/Python39/python.exe "c:/Users/Administrator/Desktop/exam/协程/1. 简单示例3.4后.py"
hello
hello
world
world
done: {<Task finished name='n1' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1. 简单示例3.4后.py:3> result=1>, <Task finished name='n2' coro=<run() done, defined at c:UsersAdministratorDesktopexam协程1.
简单示例3.4后.py:3> result=1>}
pending: set()
2.await ----->等待
await + 可等待对象 (协程对象, Future 对象, Task 对象) ---> IO 等待
携程对象: ----> async 修饰的函数 + ()
流程:
如果有其他任务, 在遇到IO等待(await) 的时候去执行其他任务。
示例1:
import asyncio
async def other():
print("hello")
await asyncio.sleep(3)
print("world")
return "返回值"
asyncio.run(other())
hello
world
示例2:
import asyncio
async def other():
print("hello")
await asyncio.sleep(3)
print("world")
return "返回值"
async def func():
print("执行协程函数内部代码")
#respone = await func()
task = asyncio.create_task(other())
print("执行结束")
asyncio.run(func())
结果:
执行协程函数内部代码
执行结束
hello
在这里我们发现, 我们开始执行协程内部代码遇到内部的await 并没有等,而是直接结束了,
示例3:
import asyncio
async def other():
print("hello")
await asyncio.sleep(3)
print("world")
return "返回值"
async def func():
print("执行协程函数内部代码")
respone = await other()
print("执行结束")
asyncio.run(func())
执行协程函数内部代码
hello
world
执行结束
我们发现这样只有等内部结束后才会执行,外部接下来的代码
示例4:
import asyncio
async def other():
print("hello")
await asyncio.sleep(3)
print("world")
return "返回值"
async def other2():
print("other---->开始")
await asyncio.sleep(5)
print("other ----> 结束")
async def func():
print("执行协程函数内部代码")
respone = await other()
respone2 = await other2()
print("执行结束")
asyncio.run(func())
执行协程函数内部代码
hello
world
other---->开始
other ----> 结束
执行结束
同样, await 结束后才执行下一个
3.task
在事件循环中并发的添加多个任务的。可以说解决了上面的 await 的问题。
1. 使用asyncio.create_task() 创建 3.7 之后, 建议
2. loop.create_task() 或者 asyncio.ensure_future() 来创建
async def run():
print("hello")
await asyncio.sleep(2)
print("world")
return 1
async def run1():
print("hello")
await asyncio.sleep(5)
print("world")
tasks = [run(), run1()]
done, pending = asyncio.run( asyncio.wait(tasks) )
print("done:", done)
print("pending:", pending)
import asyncio
async def run():
print("hello")
await asyncio.sleep(2)
print("world")
return 1
async def run1():
print("hello")
await asyncio.sleep(5)
print("world")
# 主函数
async def main():
# 创建task, name 任务名称
tasks = [ asyncio.create_task(run(), name="n1"),
asyncio.create_task(run1(), name="n2")]
# 或者
# tasks = [
# run(),
# run1()
# ]
# done 已完成的
# pending 未完成的
# 这一步主要是为了执行 tasks 里的---> 等待 task里的任务
done, pending = await asyncio.wait(tasks, timeout=None)
return done, pending
# 运行
done, pending = asyncio.run( main() )
print("done:", done)
print("pending:", pending)
4. asyncio.Future 对象
他是 task 的基类,一个更低级的接口帮助我们等待task的结果
Task 继承 Future, Task 对象内部 await 结果的处理基于Future 对象来的。
示例1:
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象), 这个任务什么都不干
fut = loop.create_future()
# 这个测试中这里始终等不到对象
await fut
asyncio.run(main())
示例2:
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
# 给fut 设置结果
fut.set_resukt("666")
async def main():
# 获取当前事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象), 这个任务什么都不干
fut = loop.create_future()
await loop.create_task(set_after(fut))
# 这里会等到set_after 为fut设置结果, 等待结束
data = await fut
print(data)
asyncio.run(main())
5. concurrent.futures.Future 对象
使用线程池、进程池实现异步操作时用到的对象。
帮助我们来获取结果
https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
示例1:
import asyncio
import time
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
print("hello io")
time.sleep(5) # 这时候这里可以是任何需要等待结果的操作
print("world io")
return "1111"
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
print("hello cpu")
time.sleep(5)
print("world cpu")
return "2222"
async def f(fun):
# 重点, 以线程池或进程池的方式启动
loop = asyncio.get_event_loop()
# run_in_executor 的方式启动,则可变成可等待对象。
result = loop.run_in_executor(
None, fun)
# 等待结果
response = await result
print(response)
async def main():
task = [ f(blocking_io), f(cpu_bound)]
await asyncio.wait(task)
asyncio.run( main())
# 注
# get_event_loop 只会在主线程帮您创建新的 event loop
# get_running_loop 用于获取当前正在运行的loop,如果当前主线程中没有正在运行的loop,如果没有就会报RuntimeError 错误。
本来这种方式是不支持 async, 目前的实现结合了async 实际上是通过 线程池来实现的
loop.run_in_executor( None, fun)
第一个参数传None 使用默认, 在上边连接官网中有介绍
同时也可以把 asyncio.run(main()) 改成
task = [ f(blocking_io), f(cpu_bound)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task))
6.异步上下文管理器
此种对象通过定义 __aenter__()
和 __aexit__()
方法来对 async_with 语句中的环境进行控制。
import asyncio
class AsyncContentManager:
def __init__(self):
pass
async def do_something(self):
return 666
async def __aenter__(self):
# 连接数据库
return self
async def __aexit__(self, exc_type, exc, tb):
# 异步关闭数据库
await asyncio.sleep(1)
async def func():
async with AsyncContentManager() as f:
result = await f.do_something()
asyncio.run( func() )
7. uvloop
是asyncio的事件循环的替代方案 > 默认asyncio的事件循环。
pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 编写asyncio的代码,与之前写的代码一致
# 内部的事件循环自动化会变成uvloop
asyncio.run()
8. 实操
8.1 异步redis
在python 操作redis, 连接/操作/断开都是网络IO
pip install aioredis
示例1:
import asyncio
import aioredis
async def execute(address, passwd, test):
"""
@address: redis 连接地址
@passwd: redis 密码
"""
print("开始执行", test)
redis = await aioredis.create_redis_pool(address=address, password=passwd)
# 网络io操作
await redis.hmset_dict("car", key1=1, key2=2, key3=3)
# 网络io操作、遇到io自动切换任务
result = await redis.hgetall("car", encoding="utf-8")
print(result)
redis.close()
# 网络io操作, 遇到io会自动切换任务
await redis.wait_closed()
print("结束", test)
tasks = [
execute("redis://127.0.0.1:6379", "295213", "测试1"),
execute("redis://127.0.0.1:6379", "295213", "测试2")
]
asyncio.run(asyncio.wait(tasks))
结果
开始执行 测试2
开始执行 测试1
{'key1': '1', 'key2': '2', 'key3': '3'}
{'key1': '1', 'key2': '2', 'key3': '3'}
结束 测试2
结束 测试1
8.2 异步mysql
pip install aiomysql
示例1:
import asyncio
import aiomysql
async def execute(host, password, test):
print("开始", test)
# 建立连接
conn = await aiomysql.connect(host=host, port=3306, user="root", password=password, db="work")
# 创建指针
cur = await conn.cursor()
# 查询
await cur.execute("select * from users limit 1")
data = await cur.fetchall()
print(data)
await cur.close()
conn.close()
print("结束", test)
tasks = [
execute("127.0.0.1", "295213", "测试1"),
execute("xxx.xx.xx.xx", "295213", "测试2")
]
asyncio.run(asyncio.wait(tasks))