介绍几种Python异步执行的方式
参考:
通过 threading.Thread 实现
先将需要异步执行的函数用线程的方式包装为一个装饰器,然后拿去装饰需要异步执行的函数即可。
下面构造两个函数
from threading import Thread
from time import sleep
import numpy as np
import pandas as pd
def async_call(func):
def wrapper(*args, **kwargs):
thr = Thread(target=func, args=args, kwargs=kwargs)
thr.start()
return wrapper
@async_call
def A(x1, x2, i):
data = pd.DataFrame(np.random.randn(x1, x2))
sleep(2)
data.to_csv('data_{}.csv'.format(i))
print ('data_{} save done.'.format(i))
def B(i):
print ('B func ', i)
if __name__ == "__main__":
for i in range(10):
A(1000, 1000, i)
B(i)
A 函数用 async_call 进行装饰,根据输入的形状创建数组并保存。
B 函数不进行装饰,打印输入的值
起一个循环,连续顺序执行10次A和B。
可以看到程序首先吊起了10个线程去执行A函数,不必等待A函数执行完毕,先输出B函数的打印信息“B func i”,随后这10个线程的命令分别执行完毕,并打印出相关信息“data_i save done.”。在当前目录保存了10个文件 "data_1.csv", "data_2.csv"... 。
通过 yield 实现协程
yield 可以让程序暂停运行,等待主程序发送数据,下次继续再yield处暂停。下面看一个例子通过yield实现协程。
使用yeild实现的协程时需要先用next激活,才能使用send发送数据。
next时会产生yield右边的数据,也就是name。
send时接收值的是yield左边的数据,也就是x。
协程结束时会抛出StopIteration。
def coroutine_example(name):
print ('start croutine name:{}'.format(name))
x = yield name
print ('send value:{}'.format(x))
if __name__ == "__main__":
coro = coroutine_example('hi')
print (next(crou))
print (coro.send(1))
输出结果:
start croutine name:hi
hi
send value:1
Exception has occurred: StopIteration
File "C:UsersAdministratorDesktop estasync_test.py", line 43, in <module>
print (crou.send(1))
yield from 说明
yield from 和for 循环类似,yield from x 内部先调用iter(x),然后调用next()获取x中的value。此处x为任意可迭代对象。
下面看代码:
def for_test():
for i in range(3):
yield i
def yield_yield_test():
yield from range(3)
输出结果:
[0, 1, 2]
[0, 1, 2]
异步io (asyncio)
异步IO的asyncio库使用时间循环驱动的协程实现并发。用户可自主控制程序,在认为耗时处添加 yield from。在 asyncio 中,协程使用@asyncio.coroutine 来装饰,使用 yield from 来驱动。在Python 3.5版本做了如下更改:
- @asyncio.coroutine --> async def
- yield from --> await
asyncio 中的几个概念:
- 事件循环
管理所有的事件,在整个程序运行过程中不断循环执行并追踪事件发生的顺序将它们放在队列中,空闲时调用相应的事件处理者来处理这些事件。
- Fucture
Future对象表示尚未完成的计算,还未完成的结果
- Task
是Future的子类,作用是在运行某个任务的同时可以并发的运行多个任务。
asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数创建:
-
asyncio.async()
-
loop.create_task() 或 asyncio.ensure_future()
最简单的异步IO示例
- run_until_complete():
阻塞调用,直到协程运行结束才返回。 参数是future,传入协程对象时内部会自动变为future。
- asyncio.sleep():
模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程。
注意: 若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环。
import asyncio
async def coroutine_example():
await asyncio.sleep(1)
print ('Fosen')
if __name__ == "__main__":
coro = coroutine_example()
loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
loop.close()
输出:暂停一秒后,打印“Fosen”。
创建 Task
- loop.create_task():
接收一个协程,返回一个asyncio.Task的实例,也是asyncio.Future的实例,毕竟Task是Future的子类。返回值可直接传入run_until_complete()
返回的Task对象可以看到协程的运行情况, 可以通过task.result获取task协程的返回值,当协程未完成时,会出InvalidStateError。
import asyncio
async def coroutine_example():
await asyncio.sleep(1)
print ('Fosen')
if __name__ == "__main__":
coro = coroutine_example()
loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print ('运行情况:', task)
try:
print ('返回值:', task.result())
except asyncio.InvalidStateError:
print ('task 状态未完成,捕获 InvalidStateError')
loop.run_until_complete(task)
print ('再看下运行情况:', task)
print ('返回值:', task.result())
loop.close()
输出:
运行情况: <Task pending coro=<coroutine_example() running at c:UsersAdministratorDesktop estasync_test.py:57>>
task 状态未完成,捕获 InvalidStateError
Fosen
再看下运行情况: <Task finished coro=<coroutine_example() done, defined at c:UsersAdministratorDesktop estasync_test.py:57> result=None>
返回值: None
多任务控制并获取返回值
- asyncio.wait()
asyncio.wait()是一个协程,不会阻塞,立即返回,返回的是协程对象。传入的参数是future或协程构成的可迭代对象。最后将返回值传给run_until_complete()加入事件循环。
下面看代码示例:
import asyncio
async def coroutine_example(name):
print ('正在执行:', name)
await asyncio.sleep(2)
print ('执行完毕:', name)
return '返回值:' + name
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [loop.create_task(coroutine_example('Fosen_{}'.format(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
for task in tasks:
print (task.result())
loop.close()
运行输出:
正在执行: Fosen_0
正在执行: Fosen_1
正在执行: Fosen_2
执行完毕: Fosen_0
执行完毕: Fosen_1
执行完毕: Fosen_2
返回值:Fosen_0
返回值:Fosen_1
返回值:Fosen_2
动态添加协程--同步方式
创建一个线程,使事件循环在该线程中永久运行,通过 new_loop.call_soon_threadsafe 来添加协程任务。跟直接以线程的方式封装一个异步装饰器的方法有点类似。 见代码:
import asyncio
from threading import Thread
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def thread_example(name):
print ('正在执行:', name)
return '返回结果:' + name
if __name__ == "__main__":
new_loop = asyncio.new_event_loop()
t = Thread(target=start_thread_loop, args=(new_loop,))
t.start()
handle = new_loop.call_soon_threadsafe(thread_example, '1')
handle.cancel()
new_loop.call_soon_threadsafe(thread_example, '2')
print ('主线程不阻塞')
new_loop.call_soon_threadsafe(thread_example, '3')
print ('继续运行中...')
运行结果:
正在执行: 2
主线程不阻塞
继续运行中...
正在执行: 3
动态添加协程--异步方式
同样创建一个线程来永久运行事件循环。不同的是 thread_example为一个协程函数,通过 asyncio.run_coroutine_threadsafe 来添加协程任务。
t.setDaemon(True) 表示把子线程设为守护线程,防止主线程已经退出了子线程还没退出。
import asyncio
from threading import Thread
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def thread_example(name):
print ('正在执行:', name)
await asyncio.sleep(1)
return '返回结果:' + name
if __name__ == "__main__":
new_loop = asyncio.new_event_loop()
t = Thread(target=start_thread_loop, args=(new_loop,))
t.setDaemon(True)
t.start()
future = asyncio.run_coroutine_threadsafe(thread_example('1'), new_loop)
print (future.result())
asyncio.run_coroutine_threadsafe(thread_example('2'), new_loop)
print ('主线程不阻塞')
asyncio.run_coroutine_threadsafe(thread_example('3'), new_loop)
print ('继续运行中...')
运行结果
正在执行: 1
返回结果:1
主线程不阻塞
正在执行: 2
继续运行中...
正在执行: 3
协程中生产-消费模型设计
结合上面的动态异步添加协程的思想,我们设计两个生产-消费模型,分别基于Python内置队列和Redis队列。
基于Python 内置双向队列的生产-消费模型
import asyncio
from threading import Thread
from collections import deque
import random
import time
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def consumer():
while True:
if dp:
msg = dp.pop()
if msg:
asyncio.run_coroutine_threadsafe(thread_example('Fosen_{}'.format(msg)), new_loop)
async def thread_example(name):
print ('正在执行:', name)
await asyncio.sleep(2)
return '返回结果:' + name
if __name__ == "__main__":
dp = deque()
new_loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()
while True:
i = random.randint(1, 10)
dp.appendleft(str(i))
time.sleep(2)
运行输出:
正在执行: Fosen_6
正在执行: Fosen_2
正在执行: Fosen_8
正在执行: Fosen_2
正在执行: Fosen_1
正在执行: Fosen_3
正在执行: Fosen_1
基于 Redis 队列的生产-消费模型
这种写法与基于python队列的相似,只是操作队列、获取数据的方式不同而已。
import asyncio
from threading import Thread
import redis
# 生产者代码
def producer():
for i in range(4):
redis_conn.lpush('Fosen', str(i))
# 消费者代码
def get_redis():
conn_pool = redis.ConnectionPool(host='127.0.0.1', port=6379)
return redis.Redis(connection_pool=conn_pool)
def start_thread_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def thread_example(name):
print ('正在执行:', name)
await asyncio.sleep(2)
return '返回结果:' + name
if __name__ == "__main__":
redis_conn = get_redis()
producer()
new_loop = asyncio.new_event_loop()
loop_thread = Thread(target=start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
while True:
msg = redis_conn.rpop('Fosen')
if msg:
asyncio.run_coroutine_threadsafe(thread_example('Fosen_{}'.format(msg)), new_loop)
运行结果:
正在执行: Fosen_b'0'
正在执行: Fosen_b'1'
正在执行: Fosen_b'2'
正在执行: Fosen_b'3'