协程语法
在Python 3.5+发布之前,asyncio模块使用生成器模拟异步调用,因此具有与当前Python 3.5版本不同的语法 ;以下代码均基于python3.7
从Python 3.5开始引入了异步async及await关键字。注意,在await func()调用时无需带上括号,先感受一下如下代码:
import asyncio
async def main():
print(await func())
async def func():
# 这里可以写耗时较长的代码
return "Hello, world!"
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
事件循环EventLoop
事件循环是asyncio的核心,异步任务的运行、任务完成之后的回调、网络IO操作、子进程的运行,都是通过事件循环完成的。在python3.7中,我们甚至完全不用管事件循环,只需要使用高层API,即asyncio中的方法,我们很少直接与事件循环打交道,但是为了更加熟悉asyncio的运行原理,最好还是了解EventLoop的设计原理。
1、事件循环的创建、获取、设置
(1)asyncio.get_running_loop()。python3.7新添加的
(2)asyncio.get_event_loop()
(3)asyncio.set_event_loop(loop)
(4)asyncio.new_event_loop()
2、运行和停止事件循环
(1)loop.run_until_complete(future)。运行事件循环,直到future运行结束
(2)loop.run_forever()。在python3.7中已经取消了,表示事件循环会一直运行,直到遇到stop。
(3)loop.stop()。停止事件循环
(4)loop.is_running()。如果事件循环依然在运行,则返回True
(5)loop.is_closed()。如果事件循环已经close,则返回True
(6)loop.close()。关闭事件循环
3、创建Future和Task
(1)loop.create_future(coroutine) ,返回future对象
(2)loop.create_task(corootine) ,返回task对象
(3)loop.set_task_factory(factory)
(4)loop.get_task_factory()
4、事件循环的时钟
loop.time()。可以这么理解,事件循环内部也维护着一个时钟,可以查看事件循环现在运行的时间点是多少,就像普通的time.time()类似,它返回的是一个浮点数值,比如下面的代码。转自:https://blog.csdn.net/qq_27825451/article/details/86292513
接下来,在python云环境上运行下面的代码,实地体现一下异步程序调用:
import asyncio
async def cor1():
print("cor1 start")
for i in range(10):
await asyncio.sleep(1.5)
print("cor1", i)
async def cor2():
print("cor2 start")
for i in range(15):
await asyncio.sleep(1)
print("cor2", i)
loop = asyncio.get_event_loop()
cors = asyncio.wait([cor1(), cor2()])
loop.run_until_complete(cors)
#
cor2 start
cor1 start
cor2 0
cor1 0
cor2 1
cor1 1
cor2 2
cor2 3
cor1 2
cor2 4
cor1 3
cor2 5
cor2 6
cor1 4
cor2 7
cor1 5
cor2 8
cor2 9
cor1 6
cor2 10
cor1 7
cor2 11
cor2 12
cor1 8
cor2 13
cor1 9
cor2 14
一开始还是通过 asyncio.get_event_loop() 得到事件循环, 之后调用了 run_until_complete( 运行 loop ,等到 future 完成,run_until_complete 即返回 ) ,而调用的参数是一个 async 修饰过的函数的返回值
异步执行
asyncio 支持在异步调用任务中使用 Executor 对象 ,将Executor对象作为参数、可调用函数及其自身参数共同作为参数去调用run_in_executor() 函数来驱动异步执行事件
Executor 调度任务:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# 做耗时的事情
return 'test '+a + b
async def main(loop):
executor = ThreadPoolExecutor() #初始化线程池执行器
result = await loop.run_in_executor(executor, func, "Hello", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# 输出: test Hello world!
每个事件循环还有一个“默认”执行程序槽,可以分配给执行程序。要分配执行程序并从循环中调度任务,可以使用set default Executor()方法
import asyncio
from concurrent.futures import ThreadPoolExecutor
def func(a, b):
# 做耗时的事情
return 'test,'+a + b
async def main(loop):
# 注意:使用“None”作为第一个参数指定“默认”执行器 .
result = await loop.run_in_executor(None, func, "Hello,", " world!")
print(result)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor())
loop.run_until_complete(main(loop))
#输出 test,Hello, world!
parallel.futures中有两种主要的Executor类型,即ThreadPoolExecutor和
ProcessPoolExecutor。 ThreadPoolExecutor包含一个线程池,可以手动将其设置为通过构造函数的特定线程数,或者默认为计算机上的内核数乘以5。
ThreadPoolExecutor使用线程池执行分配给它的任务,并且通常在CPU绑定方面更好操作,而不用受I / O约束。与此形成对比的是ProcessPoolExecutor:
ProcessPoolExecutor只能接受可修改的任务和参数。最常见的非picklable任务的对象的方法。如果必须将对象的方法调度为执行程序中的任务,则必须使用ThreadPoolExecutor
UVLoop
uvloop是异步的一种实现。基于libuv的AbstractEventLoop(由nodejs使用)。它兼容99%的异步特性,并且比传统的asyncio. eventloop快得多。uvloop目前在Windows上不可用,使用前先在python云环境上用命令 python -m pip install uvloop安装它
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop(uvloop.new_event_loop())
# 你的业务代码...
也可以通过 EventLoopPolicy 在uvloop中设置事件循环工厂 :
import asyncio
import uvloop
if __name__ == "__main__":
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.new_event_loop()
Event事件异步驱动模型
概念
可以使用event事件同步多个协程的调度
简单地说,就像一场赛跑中,一场令下枪声响起,所有赛跑者根据枪声离开起跑线,开始并行的工作
举例:
import asyncio
import functools
# 事件触发函数
def trigger(event):
print('EVENT SET')
event.set()
# 唤醒协程等待
# 事件消费者
async def consumer_a(event):
consumer_name = 'Consumer A'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
async def consumer_b(event):
consumer_name = 'Consumer B'
print('{} waiting'.format(consumer_name))
await event.wait()
print('{} triggered'.format(consumer_name))
# 事件
event = asyncio.Event()
# 将 coroutines 放入 future
main_future = asyncio.wait([consumer_a(event),
consumer_b(event)])
# 事件循环
event_loop = asyncio.get_event_loop()
event_loop.call_later(3, functools.partial(trigger, event)) #使用偏函数封装(详见函数式编程https://blog.csdn.net/oSuiYing12/article/details/106211761)
# 3秒后触发事件
# 完成主future
done, pending = event_loop.run_until_complete(main_future)
#输出:
Consumer B waiting
Consumer A waiting
#等待3秒后继续输出
EVENT SET
Consumer B triggered
Consumer A triggered
实现简单的Websocket
在这里,我们使用asyncio创建一个简单的 websocket。我们定义了协程以连接到服务器,并发送/接收消息。网络套接字的通信在主协程中运行,该协程由
事件循环event loop驱动:
import asyncio
import aiohttp # 需要先在云环境中安装 python -m pip install aiohttp
session = aiohttp.ClientSession()
# 处理上下文管理器
class EchoWebsocket:
async def connect(self):
self.websocket = await session.ws_connect("wss://echo.websocket.org") #建立连接的websocket服务器地址,由于在国外可能访问不到
async def send(self, message):
self.websocket.send_str(message) #发送消息函数
async def receive(self):
result = (await self.websocket.receive()) #接收消息函数
return result.data
async def main():
echo = EchoWebsocket()
await echo.connect()
await echo.send("Hello World!")
print(await echo.receive()) # 正常情况下会返回打印出"Hello World!"
if __name__ == '__main__':
# The main loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main())