zoukankan      html  css  js  c++  java
  • Python 进阶 异步async/await

    一,前言

      本文将会讲述Python 3.5之后出现的async/await的使用方法,我从上看到一篇不错的博客,自己对其进行了梳理。该文章原地址https://www.cnblogs.com/dhcn/p/9032461.html

    二,Python常见的函数形式

      2.1 普通函数

    def fun():
        return 1
    
    if __name__ == '__main__':
        fun()
     

      普通函数,没有什么特别的,直接函数名加括号调用即可。

      2.2 生成器函数

    def generator_fun():
        yield 1
    
    if __name__ == '__main__':
        print(generator_fun())  #<generator object generator_fun at 0x00000000005E6E08>
        print(generator_fun().send(None))  
        for i in generator_fun():
            print(i)

      与普通函数不同的是,生成器函数返回的是一个生成器对象。我们可以通过send()方法或者是for循环的方法来对其进行取值。

      2.3 (异步)协程函数

    async def async_fun():
        return 1
    
    if __name__ == '__main__':
        print(async_fun())  # <coroutine object fun at 0x0000000002156E08>
        print(async_fun().send(None))  # StopIteration: 1
        try:
            async_fun().send(None)
        except StopIteration as e:
            print(e.value)

      异步函数的调用返回的是一个协程对象,若改对象通过send()进行调用,会报一个StopIteration错,若要取值,就需要捕获该异常,e.value的形式进行获取。

      在协程函数中,可以通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果:

    async def async_function():
        return 1
    
    async def await_coroutine():
        result = await async_function()
        print(result)
        
    run(await_coroutine())
    # 1

      2.4 异步生成器

    async def async_fun():
        async for i in generator_async_fun():
            print(i)
    
    
    async def generator_async_fun():
        yield 1
    
    if __name__ == '__main__':
        async_fun().send(None)

      异步生成器的调用比较特殊,它需要依赖别的异步函数进行调用。

    三,await的使用

      await语法可以挂起自声协程,等待另一个协程完成直到返回结果。但是有一些地方需要注意:

      3.1 await注意事项

      await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。

      而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。

      3.2 关于await的源代码解析

      查看Awaitable抽象类的代码,表明了只要一个类实现了__await__方法,那么通过它构造出来的实例就是一个Awaitable:

    class Awaitable(metaclass=ABCMeta):
        __slots__ = ()
    
        @abstractmethod
        def __await__(self):
            yield
    
        @classmethod
        def __subclasshook__(cls, C):
            if cls is Awaitable:
                return _check_methods(C, "__await__")
            return NotImplemented

      而且可以看到,Coroutine类也继承了Awaitable,而且实现了send,throw和close方法。所以await一个调用异步函数返回的协程对象是合法的。

    class Coroutine(Awaitable):
        __slots__ = ()
    
        @abstractmethod
        def send(self, value):
            ...
    
        @abstractmethod
        def throw(self, typ, val=None, tb=None):
            ...
    
        def close(self):
            ...
            
        @classmethod
        def __subclasshook__(cls, C):
            if cls is Coroutine:
                return _check_methods(C, '__await__', 'send', 'throw', 'close')
            return NotImplemented

      3.3 关于异步生成器的实例

      案例:假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:

    class Potato:
        @classmethod
        def make(cls, num, *args, **kws):
            potatos = []
            for i in range(num):
                potatos.append(cls.__new__(cls, *args, **kws))
            return potatos
    
    all_potatos = Potato.make(5)

      现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:

    def take_potatos(num):
        count = 0
        while True:
            if len(all_potatos) == 0:
                sleep(.1)
            else:
                potato = all_potatos.pop()
                yield potato
                count += 1
                if count == num:
                    break
    
    def buy_potatos():
        bucket = []
        for p in take_potatos(50):
            bucket.append(p)

      对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:

    async def take_potatos(num):
        count = 0
        while True:
            if len(all_potatos) == 0:
                await ask_for_potato()
            potato = all_potatos.pop()
            yield potato
            count += 1
            if count == num:
                break

      当货架上的土豆没有了之后,我可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程:

    async def ask_for_potato():
        await asyncio.sleep(random.random())
        all_potatos.extend(Potato.make(random.randint(1, 10)))

      当生产者完成和返回之后,这是便能从await挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程:

    async def buy_potatos():
        bucket = []
        async for p in take_potatos(50):
            bucket.append(p)
            print(f'Got potato {id(p)}...')

      async for语法表示我们要后面迭代的是一个异步生成器。

    def main():
        import asyncio
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(buy_potatos())
        loop.close()

      用asyncio运行这段代码,结果是这样的:

    Got potato 4338641384...
    Got potato 4338641160...
    Got potato 4338614736...
    Got potato 4338614680...
    Got potato 4338614568...
    Got potato 4344861864...

      既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:

    def main():
        import asyncio
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
        loop.close()

      再来运行这段代码:

    Got potato 4423119312...
    Got tomato 4423119368...
    Got potato 4429291024...
    Got potato 4421640768...
    Got tomato 4429331704...
    Got tomato 4429331760...
    Got tomato 4423119368...

      3.4 AsyncGenerator的定义

      它需要实现__aiter__和__anext__两个核心方法,以及asend,athrow,aclose方法。

    class AsyncGenerator(AsyncIterator):
        __slots__ = ()
    
        async def __anext__(self):
            ...
    
        @abstractmethod
        async def asend(self, value):
            ...
    
        @abstractmethod
        async def athrow(self, typ, val=None, tb=None):
            ...
    
        async def aclose(self):
            ...
    
        @classmethod
        def __subclasshook__(cls, C):
            if cls is AsyncGenerator:
                return _check_methods(C, '__aiter__', '__anext__',
                                      'asend', 'athrow', 'aclose')
            return NotImplemented

      异步生成器是在3.6之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:

    bucket = [p async for p in take_potatos(50)]

      类似的,还有await表达式:

    result = [await fun() for fun in funcs if await condition()]

    四,async的其他用法

      4.1 async修饰类普通方法

      除了函数之外,类实例的普通方法也能用async语法修饰:

    class ThreeTwoOne:
        async def begin(self):
            print(3)
            await asyncio.sleep(1)
            print(2)
            await asyncio.sleep(1)
            print(1)        
            await asyncio.sleep(1)
            return
    
    async def game():
        t = ThreeTwoOne()
        await t.begin()
        print('start')

      实例方法的调用同样是返回一个coroutine:

    function = ThreeTwoOne.begin
    method = function.__get__(ThreeTwoOne, ThreeTwoOne())
    import inspect
    assert inspect.isfunction(function)
    assert inspect.ismethod(method)
    assert inspect.iscoroutine(method())

      4.2 async 修饰类方法

    class ThreeTwoOne:
        @classmethod
        async def begin(cls):
            print(3)
            await asyncio.sleep(1)
            print(2)
            await asyncio.sleep(1)
            print(1)        
            await asyncio.sleep(1)
            return
    
    async def game():
        await ThreeTwoOne.begin()
        print('start')

      4.3 async的上下文管理器应用

      根据PEP 492中,async也可以应用到上下文管理器中,__aenter__和__aexit__需要返回一个Awaitable:

    class GameContext:
        async def __aenter__(self):
            print('game loading...')
            await asyncio.sleep(1)
    
        async def __aexit__(self, exc_type, exc, tb):
            print('game exit...')
            await asyncio.sleep(1)
    
    async def game():
        async with GameContext():
            print('game start...')
            await asyncio.sleep(2)

      在3.7版本,contextlib中会新增一个asynccontextmanager装饰器来包装一个实现异步协议的上下文管理器:

    from contextlib import asynccontextmanager
    
    @asynccontextmanager
    async def get_connection():
        conn = await acquire_db_connection()
        try:
            yield
        finally:
            await release_db_connection(conn)

    五,await和yield from

      Python3.3的yield from语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:

    def sub_gen():
        yield 1
        yield 2
        yield 3
    
    def gen():
        return (yield from sub_gen())
    
    def main():
        for val in gen():
            print(val)
    # 1
    # 2
    # 3

      利用这一特性,使用yield from能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from语法来创建协程:

    # https://docs.python.org/3.4/library/asyncio-task.html
    import asyncio
    
    @asyncio.coroutine
    def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        yield from asyncio.sleep(1.0)
        return x + y
    
    @asyncio.coroutine
    def print_sum(x, y):
        result = yield from compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()

      然而,用yield from容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。因此类似以下的调用是等价的:

    async with lock:
        ...
        
    with (yield from lock):
        ...
    ######################
    def main():
        return (yield from coro())
    
    def main():
        return (await coro())

      那么,怎么把生成器包装为一个协程对象呢?这时候可以用到types包中的coroutine装饰器(如果使用asyncio做驱动的话,那么也可以使用asyncio的coroutine装饰器),@types.coroutine装饰器会将一个生成器函数包装为协程对象:

    import asyncio
    import types
    
    @types.coroutine
    def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        yield from asyncio.sleep(1.0)
        return x + y
    
    async def print_sum(x, y):
        result = await compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()

      尽管两个函数分别使用了新旧语法,但他们都是协程对象,也分别称作native coroutine以及generator-based coroutine,因此不用担心语法问题。下面观察一个asyncio中Future的例子:

    import asyncio
    
    future = asyncio.Future()
    
    async def coro1():
        await asyncio.sleep(1)
        future.set_result('data')
    
    async def coro2():
        print(await future)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([
        coro1(), 
        coro2()
    ]))
    loop.close()

      两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果'data'。

      future可以被await证明了future对象是一个Awaitable,进入Future类的源码可以看到有一段代码显示了future实现了__await__协议:

    class Future:
        ...
        def __iter__(self):
            if not self.done():
                self._asyncio_future_blocking = True
                yield self  # This tells Task to wait for completion.
            assert self.done(), "yield from wasn't used with future"
            return self.result()  # May raise too.
    
        if compat.PY35:
            __await__ = __iter__ # make compatible with 'await' expression

      当执行await future这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。

      当future执行set_result方法时,会触发以下的代码,设置结果,标记future已经完成:

    def set_result(self, result):
        ...
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self._schedule_callbacks()

      最后future会调度自身的回调函数,触发Task._step()告知Task驱动future从之前挂起的点恢复执行,不难看出,future会执行下面的代码:

    class Future:
        ...
        def __iter__(self):
            ...
            assert self.done(), "yield from wasn't used with future"
            return self.result()  # May raise too.

      最终返回结果给调用方。

  • 相关阅读:
    什么是 bean 的自动装配?
    什么是 Spring 的内部 bean?
    什么是 Spring 的 MVC 框架?
    Spring AOP and AspectJ AOP 有什么区别?
    解释 JDBC 抽象和 DAO 模块?
    volatile 类型变量提供什么保证?
    一个 Spring Bean 定义 包含什么?
    什么是 Spring MVC 框架的控制器?
    使用 Spring 访问 Hibernate 的方法有哪些?
    什么是 Callable 和 Future?
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10774515.html
Copyright © 2011-2022 走看看