zoukankan      html  css  js  c++  java
  • asyncio之异步上下文管理器

    异步上下文管理器

    前面文章我们提到了上下文管理器,但是这个上下文管理器只适用于同步代码,不能用于异步代码(async def形式),不过不用担心今天我们就来讨论在异步中如何使用上下文管理器。
    特别提醒本教程所使用的Python版本为Python3.7。

    async with

    异步上下文管理器。类似于同步上下文管理器,我们知道使用with可以实现一个上下文管理的器,而对于异步上下文管理器其根本表现形式为async with,下面的一段代码告诉你async with是如何运作的。

    import asyncio
    class AContext:
        def __init__(self):
            print("in init")
    
        async def __aenter__(self):
            print("in aenter")
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            print("in aexit")
    
    async def main():
        async with AContext() as ac:
            print("in with", ac)
    
    if __name__ == '__main__':
        print("start")
        asyncio.run(main())
    

    输出内容

    start
    in init
    in aenter
    in with None
    in aexit
    

    下面说下async with和with的不同地方
    语法上,with实现了enter和exit两个方法,async with实现了类似方法
    aenter和aexit在同步的基础上加个a,实际上就是代表asynchronous。
    实现上,使用普通的函数就可以实现with,但是async with需要通过异步函数的形式去实现,就像上面的例子一样。

    asynccontextmanager

    从Python 3.7开始,有两种方法可以编写异步上下文管理器。一种就是前面提到的魔法函数的实现,另外一种就是contextlib的另外一个模块asynccontextmanager。通过装饰器的方式实现一个异步上下文管理器

    import asyncio
    from contextlib import asynccontextmanager
    from concurrent.futures.thread import ThreadPoolExecutor
    
    
    class AsyncFile(object):
        def __init__(self, file, loop=None, executor=None):
            if not loop:
                loop = asyncio.get_running_loop()  # 获取当前运行事件循环
            if not executor:
                executor = ThreadPoolExecutor(10)  # 线程池数量10
            self.file = file
            self.loop = loop
            self.executor = executor
            self.pending = []
            self.result = []
    
        def write(self, string):
            """
            实现异步写操作
            :param string: 要写的内容
            :return:
            """
            self.pending.append(
                self.loop.run_in_executor(
                    self.executor, self.file.write, string,
                )
            )
    
        def read(self, i):
            """
            实现异步读操作
            :param i:
            :return:
            """
            self.pending.append(
                self.loop.run_in_executor(self.executor, self.file.read, i,)
            )
    
        def readlines(self):
            self.pending.append(
                self.loop.run_in_executor(self.executor, self.file.readlines, )
            )
    
    @asynccontextmanager
    async def async_open(path, mode="w"):
        with open(path, mode=mode) as f:
            loop = asyncio.get_running_loop()
            file = AsyncFile(f, loop=loop)
            try:
                yield file
            finally:
                file.result = await asyncio.gather(*file.pending, loop=loop)
    

    上面的代码通过使用asyncio中run_in_executor运行一个线程,来使得阻塞操作变为非阻塞操作,达到异步非阻塞的目的。
    AsyncFile类提供了一些方法,这些方法将用于将write、read和readlines的调用添加到pending列表中。这些任务通过finally块中的事件循环在ThreadPoolExecutor进行调度。
    yield 前半段用来表示_aenter_()
    yield 后半段用来表示_aexit_()
    使用finally以后可以保证链接资源等使用完之后能够关闭。

    运行异步上下文管理器

    如果调用前面示例中的异步上下文管理器,则需要使用关键字async with来进行调用。另外带有async with的语句只能在异步函数中使用。

    from asynciodemo.asyncwith import async_open
    import asyncio
    import tempfile
    import os
    
    
    async def main():
        tempdir = tempfile.gettempdir()
        path = os.path.join(tempdir, "run.txt")
        print(f"临时文件目录:{path}")
    
        async with async_open(path, mode='w') as f:
            f.write("公众号: ")
            f.write("Python")
            f.write("学习")
            f.write("开发")
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    使用方法和with类似可以通过使用as,然后使用其句柄,唯一需要注意的就是要在异步函数中使用。

    同步任务

    在之前的一些异步教程里和大家说了关于协程中的几个同步方法,asyncio.wait和asyncio.gather,这里我们可以配合这些方法通过异步上下文管理器来实现同步任务,请看如下代码

    import asyncio
    
    
    # 同步挂起协程
    
    class Sync():
        def __init__(self):
            self.pending = []
            self.finished = None
    
        def schedule_coro(self, coro, shield=True):
           #如果去掉asyncio.shield,在取消fut函数的时候,就会导致coro协程也出错。
            fut = asyncio.shield(coro) if shield else asyncio.ensure_future(coro)
            self.pending.append(fut)
            return fut
    
        async def __aenter__(self):
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            # 退出async with的时候,任务列表中的任务进行并发操作。
            self.finished = await asyncio.gather(*self.pending, return_exceptions=True)
    
    
    async def workload1():
        await asyncio.sleep(2)
        print("These coroutines will be executed return 41")
        return 41
    
    
    async def workload2():
        await asyncio.sleep(2)
        print("These coroutines will be executed return 42")
        return 42
    
    
    async def workload3():
        await asyncio.sleep(2)
        print("These coroutines will be executed return 43")
        return 43
    
    
    async def main():
        async with Sync() as sync:
            # 使用异步上下文可以创建同步协程程序
            sync.schedule_coro(workload1())
            sync.schedule_coro(workload2())
            sync.schedule_coro(workload3())
        print("All scheduled corotines have retuned or throw:", sync.finished)
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    

    输出

    These coroutines will be executed return 41
    These coroutines will be executed return 42
    These coroutines will be executed return 43
    All scheduled corotines have retuned or throw: [41, 42, 43]
    

    1.程序是同步的形式并发输出的。
    2. schedule_coro的作用是将协程workload1,workload2,workload3添加到任务列表pending,退出async with的时候,任务列表中的任务进行并发操作。

  • 相关阅读:
    用ASP+DLL实现WEB方式修改服务器时间
    参加了 湖南.NET俱乐部成立大会
    Asp.Net中文本换行
    一直在思考的问题
    GRIDVIEW排序 动态实现和静态实现
    在VS 2005中使用TREEVIEW控件
    GRIDVIEW 中当数据行数未满时,填充空白行
    为了自己的心身健康 合理安排生活 特做了张时间安排表
    在VS 2005后台代码中创建用户控件
    CSS IE7 IE6 Firefox多浏览器兼容(转&摘)
  • 原文地址:https://www.cnblogs.com/c-x-a/p/11198901.html
Copyright © 2011-2022 走看看