from functools import partial import asyncio from threadpool_executor_shrink_able import ThreadPoolExecutorShrinkAble # 没有使用内置的 concurrent.futures里面的,这个是优化4点功能的。 async_executor = ThreadPoolExecutorShrinkAble(20) async def simple_run_in_executor(f, *args, async_loop=None, **kwargs): loopx = async_loop or asyncio.get_event_loop() # print(id(loopx)) result = await loopx.run_in_executor(async_executor, partial(f, *args, **kwargs)) return result if __name__ == '__main__': import time import requests def block_fun(x): time.sleep(5) print(x) return x * 10 async def enter_fun(xx): # 入口函数,盈利为一旦异步,必须处处异步。不能直接调用block_fun,否则阻塞其他任务。 await asyncio.sleep(1)
# r = block_fun(xx) # 如果你这么写那就完了个蛋,程序运行完成需要更长的时间。 r = await simple_run_in_executor(block_fun, xx) print(r) loopy = asyncio.get_event_loop() print(id(loopy)) tasks = [] tasks.append(simple_run_in_executor(requests.get, url='http://www.baidu.com')) tasks.append(simple_run_in_executor(block_fun, 1)) tasks.append(simple_run_in_executor(block_fun, 2)) tasks.append(simple_run_in_executor(block_fun, 3)) tasks.append(enter_fun(4)) tasks.append(enter_fun(5)) print('开始') loopy.run_until_complete(asyncio.wait(tasks)) print('结束')
使用asyncio时候,一个调用链流程包括了5个 阻塞io的方法或函数,如果其中一个函数现在没有对应的异步库,或者新的对应异步库很难学,快速的方式是让同步函数变成异步调用语法,可以被await,那么按上面这么封装就行了,例如假设还没有人发明aiohttp库,世上只有requests库,你的调用链流程里面不可以直接调用requests,因为一旦这么弄,只要一个任务阻塞了,真个循环的全部任务都被阻塞了,使用asyncio一旦异步需要处处异步,那么怎么样快速的非阻塞呢,按上面就行,使同步函数在线程池里面运行,这个run_in_executor本质是使concurrent.futures.Future对象变成了 asyncio 里面的可等待 Future对象,所以可以被await。
有人会有疑问,这么不是脱了裤子放屁吗,直接在异步流程里面 使用 threadpoolexecutor.submit来运行同步阻塞函数不香吗,这不就能绕开阻塞了吗?主要还是有个概念没搞懂,因为现在不仅是要非阻塞运行同步函数,最重要的是要在当前代码处拿到同步函数的执行结果马上使用, future = threadpoolexecutor.submit(block_fun,20) 然后为了得到结果需要执行future.result(),一旦使用了future.result(),那么当前调用链又回到老问题了,那就是被阻塞。解决这个问题唯有使用
r = await simple_run_in_executor(block_fun, 20),既能在线程池运行同步函数,而且为了得到结果不至于阻塞事件循环。
还有一点是为什么不使用官方的loop.run_in_executor,而是封装了一个simple_run_in_executor东西呢,主要是因为 内置的 run_in_executor 方法,不接受关键字参数,只能接受位置参数,例如requests.get函数,入参个数高达20个,如果严格的使用位置参数,那么requests.get函数的入参顺序必须按照定义的一样非常准确一个顺序都不能乱,必须要非常精确小心,这几乎不可能做到。
run_in_executor必须严格的按照顺序传参,例如你想设置request的timeout值,必须在前面写很多个None来占位置;还有例如不能把headers写在data前面,不支持关键字方式入参,很难用。使用偏函数来解决关键字入参是官方教程推荐的方式。
参考链接 https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
只有使用我这个 simple_run_in_executor ,立即让同步函数化身异步语法,使同步库的函数和调用链上的其他异步库的函数能够协同作战。
run_in_executor 到底是协程程运行的还是线程运行的阻塞函数呢,毫无疑问还是线程里面运行的,在block_fun函数里面打印一下 threding.ident的线程号就可以了。
他的主要作用不是把同步变成协程运行,而是让其拥有了异步await 的用法,既能不阻塞当前事件循环,又能在同步函数执行完成return结果时拿到结果接着用。
此文分别拿blcok_fun 函数和 requests.get 函数来演示同步函数在异步调用链里面非阻塞运行,说的是快速同步变异步的方式,归根结底还是线程运行。但是例如有人已经发明了 aiohttp 异步请求库,那就不需要使用 run_in_executor + requests 了,最好是使用aiohttp就行了。
这个主要是为了例如 调用链路上用了10个io操作的库,其中有9个有对应的异步库,但有1个没有对应的异步库,此时不能因为现存的没有人发明这个异步库就不继续写代码罢工了吧。
2.比较asyncio.run_coroutine_threadsafe 和 run_in_executor
asyncio.run_coroutine_threadsafe 和 run_in_executor 是一对反义词。
asyncio.run_coroutine_threadsafe 是在非异步的上下文环境(也就是正常的同步语法的函数里面)下调用异步函数对象(协程),
因为当前函数定义没有被async修饰,就不能在函数里面使用await,必须使用这。这个是将asyncio包的future对象转化返回一个concurrent.futures包的future对象。
run_in_executor 是在异步环境(被async修饰的异步函数)里面,调用同步函数,将函数放到线程池运行防止阻塞整个事件循环的其他任务。
这个是将 一个concurrent.futures包的future对象 转化为 asyncio包的future对象,
asyncio包的future对象是一个asyncio包的awaitable对象,所以可以被await,concurrent.futures.Future对象不能被await。