zoukankan      html  css  js  c++  java
  • Python 进阶 异步async/await

    一,前言

      本文将会讲述Python 3.5之后出现的async/await的使用方法,我从上看到一篇不错的博客,自己对其进行了梳理。该文章原地址https://www.cnblogs.com/dhcn/p/9032461.html

    二,Python常见的函数形式

      2.1 普通函数

    def fun():
        return 1
    
    if __name__ == '__main__':
        fun()
     

      普通函数,没有什么特别的,直接函数名加括号调用即可。

      2.2 生成器函数

    def generator_fun():
        yield 1
    
    if __name__ == '__main__':
        print(generator_fun())  #<generator object generator_fun at 0x00000000005E6E08>
        print(generator_fun().send(None))  
        for i in generator_fun():
            print(i)

      与普通函数不同的是,生成器函数返回的是一个生成器对象。我们可以通过send()方法或者是for循环的方法来对其进行取值。

      2.3 (异步)协程函数

    async def async_fun():
        return 1
    
    if __name__ == '__main__':
        print(async_fun())  # <coroutine object fun at 0x0000000002156E08>
        print(async_fun().send(None))  # StopIteration: 1
        try:
            async_fun().send(None)
        except StopIteration as e:
            print(e.value)

      异步函数的调用返回的是一个协程对象,若改对象通过send()进行调用,会报一个StopIteration错,若要取值,就需要捕获该异常,e.value的形式进行获取。

      在协程函数中,可以通过await语法来挂起自身的协程,并等待另一个协程完成直到返回结果:

    async def async_function():
        return 1
    
    async def await_coroutine():
        result = await async_function()
        print(result)
        
    run(await_coroutine())
    # 1

      2.4 异步生成器

    async def async_fun():
        async for i in generator_async_fun():
            print(i)
    
    
    async def generator_async_fun():
        yield 1
    
    if __name__ == '__main__':
        async_fun().send(None)

      异步生成器的调用比较特殊,它需要依赖别的异步函数进行调用。

    三,await的使用

      await语法可以挂起自声协程,等待另一个协程完成直到返回结果。但是有一些地方需要注意:

      3.1 await注意事项

      await语法只能出现在通过async修饰的函数中,否则会报SyntaxError错误。

      而且await后面的对象需要是一个Awaitable,或者实现了相关的协议。

      3.2 关于await的源代码解析

      查看Awaitable抽象类的代码,表明了只要一个类实现了__await__方法,那么通过它构造出来的实例就是一个Awaitable:

    class Awaitable(metaclass=ABCMeta):
        __slots__ = ()
    
        @abstractmethod
        def __await__(self):
            yield
    
        @classmethod
        def __subclasshook__(cls, C):
            if cls is Awaitable:
                return _check_methods(C, "__await__")
            return NotImplemented

      而且可以看到,Coroutine类也继承了Awaitable,而且实现了send,throw和close方法。所以await一个调用异步函数返回的协程对象是合法的。

    class Coroutine(Awaitable):
        __slots__ = ()
    
        @abstractmethod
        def send(self, value):
            ...
    
        @abstractmethod
        def throw(self, typ, val=None, tb=None):
            ...
    
        def close(self):
            ...
            
        @classmethod
        def __subclasshook__(cls, C):
            if cls is Coroutine:
                return _check_methods(C, '__await__', 'send', 'throw', 'close')
            return NotImplemented

      3.3 关于异步生成器的实例

      案例:假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:

    class Potato:
        @classmethod
        def make(cls, num, *args, **kws):
            potatos = []
            for i in range(num):
                potatos.append(cls.__new__(cls, *args, **kws))
            return potatos
    
    all_potatos = Potato.make(5)

      现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:

    def take_potatos(num):
        count = 0
        while True:
            if len(all_potatos) == 0:
                sleep(.1)
            else:
                potato = all_potatos.pop()
                yield potato
                count += 1
                if count == num:
                    break
    
    def buy_potatos():
        bucket = []
        for p in take_potatos(50):
            bucket.append(p)

      对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:

    async def take_potatos(num):
        count = 0
        while True:
            if len(all_potatos) == 0:
                await ask_for_potato()
            potato = all_potatos.pop()
            yield potato
            count += 1
            if count == num:
                break

      当货架上的土豆没有了之后,我可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程:

    async def ask_for_potato():
        await asyncio.sleep(random.random())
        all_potatos.extend(Potato.make(random.randint(1, 10)))

      当生产者完成和返回之后,这是便能从await挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程:

    async def buy_potatos():
        bucket = []
        async for p in take_potatos(50):
            bucket.append(p)
            print(f'Got potato {id(p)}...')

      async for语法表示我们要后面迭代的是一个异步生成器。

    def main():
        import asyncio
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(buy_potatos())
        loop.close()

      用asyncio运行这段代码,结果是这样的:

    Got potato 4338641384...
    Got potato 4338641160...
    Got potato 4338614736...
    Got potato 4338614680...
    Got potato 4338614568...
    Got potato 4344861864...

      既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:

    def main():
        import asyncio
        loop = asyncio.get_event_loop()
        res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
        loop.close()

      再来运行这段代码:

    Got potato 4423119312...
    Got tomato 4423119368...
    Got potato 4429291024...
    Got potato 4421640768...
    Got tomato 4429331704...
    Got tomato 4429331760...
    Got tomato 4423119368...

      3.4 AsyncGenerator的定义

      它需要实现__aiter__和__anext__两个核心方法,以及asend,athrow,aclose方法。

    class AsyncGenerator(AsyncIterator):
        __slots__ = ()
    
        async def __anext__(self):
            ...
    
        @abstractmethod
        async def asend(self, value):
            ...
    
        @abstractmethod
        async def athrow(self, typ, val=None, tb=None):
            ...
    
        async def aclose(self):
            ...
    
        @classmethod
        def __subclasshook__(cls, C):
            if cls is AsyncGenerator:
                return _check_methods(C, '__aiter__', '__anext__',
                                      'asend', 'athrow', 'aclose')
            return NotImplemented

      异步生成器是在3.6之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:

    bucket = [p async for p in take_potatos(50)]

      类似的,还有await表达式:

    result = [await fun() for fun in funcs if await condition()]

    四,async的其他用法

      4.1 async修饰类普通方法

      除了函数之外,类实例的普通方法也能用async语法修饰:

    class ThreeTwoOne:
        async def begin(self):
            print(3)
            await asyncio.sleep(1)
            print(2)
            await asyncio.sleep(1)
            print(1)        
            await asyncio.sleep(1)
            return
    
    async def game():
        t = ThreeTwoOne()
        await t.begin()
        print('start')

      实例方法的调用同样是返回一个coroutine:

    function = ThreeTwoOne.begin
    method = function.__get__(ThreeTwoOne, ThreeTwoOne())
    import inspect
    assert inspect.isfunction(function)
    assert inspect.ismethod(method)
    assert inspect.iscoroutine(method())

      4.2 async 修饰类方法

    class ThreeTwoOne:
        @classmethod
        async def begin(cls):
            print(3)
            await asyncio.sleep(1)
            print(2)
            await asyncio.sleep(1)
            print(1)        
            await asyncio.sleep(1)
            return
    
    async def game():
        await ThreeTwoOne.begin()
        print('start')

      4.3 async的上下文管理器应用

      根据PEP 492中,async也可以应用到上下文管理器中,__aenter__和__aexit__需要返回一个Awaitable:

    class GameContext:
        async def __aenter__(self):
            print('game loading...')
            await asyncio.sleep(1)
    
        async def __aexit__(self, exc_type, exc, tb):
            print('game exit...')
            await asyncio.sleep(1)
    
    async def game():
        async with GameContext():
            print('game start...')
            await asyncio.sleep(2)

      在3.7版本,contextlib中会新增一个asynccontextmanager装饰器来包装一个实现异步协议的上下文管理器:

    from contextlib import asynccontextmanager
    
    @asynccontextmanager
    async def get_connection():
        conn = await acquire_db_connection()
        try:
            yield
        finally:
            await release_db_connection(conn)

    五,await和yield from

      Python3.3的yield from语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:

    def sub_gen():
        yield 1
        yield 2
        yield 3
    
    def gen():
        return (yield from sub_gen())
    
    def main():
        for val in gen():
            print(val)
    # 1
    # 2
    # 3

      利用这一特性,使用yield from能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from语法来创建协程:

    # https://docs.python.org/3.4/library/asyncio-task.html
    import asyncio
    
    @asyncio.coroutine
    def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        yield from asyncio.sleep(1.0)
        return x + y
    
    @asyncio.coroutine
    def print_sum(x, y):
        result = yield from compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()

      然而,用yield from容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。因此类似以下的调用是等价的:

    async with lock:
        ...
        
    with (yield from lock):
        ...
    ######################
    def main():
        return (yield from coro())
    
    def main():
        return (await coro())

      那么,怎么把生成器包装为一个协程对象呢?这时候可以用到types包中的coroutine装饰器(如果使用asyncio做驱动的话,那么也可以使用asyncio的coroutine装饰器),@types.coroutine装饰器会将一个生成器函数包装为协程对象:

    import asyncio
    import types
    
    @types.coroutine
    def compute(x, y):
        print("Compute %s + %s ..." % (x, y))
        yield from asyncio.sleep(1.0)
        return x + y
    
    async def print_sum(x, y):
        result = await compute(x, y)
        print("%s + %s = %s" % (x, y, result))
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()

      尽管两个函数分别使用了新旧语法,但他们都是协程对象,也分别称作native coroutine以及generator-based coroutine,因此不用担心语法问题。下面观察一个asyncio中Future的例子:

    import asyncio
    
    future = asyncio.Future()
    
    async def coro1():
        await asyncio.sleep(1)
        future.set_result('data')
    
    async def coro2():
        print(await future)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([
        coro1(), 
        coro2()
    ]))
    loop.close()

      两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果'data'。

      future可以被await证明了future对象是一个Awaitable,进入Future类的源码可以看到有一段代码显示了future实现了__await__协议:

    class Future:
        ...
        def __iter__(self):
            if not self.done():
                self._asyncio_future_blocking = True
                yield self  # This tells Task to wait for completion.
            assert self.done(), "yield from wasn't used with future"
            return self.result()  # May raise too.
    
        if compat.PY35:
            __await__ = __iter__ # make compatible with 'await' expression

      当执行await future这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。

      当future执行set_result方法时,会触发以下的代码,设置结果,标记future已经完成:

    def set_result(self, result):
        ...
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self._schedule_callbacks()

      最后future会调度自身的回调函数,触发Task._step()告知Task驱动future从之前挂起的点恢复执行,不难看出,future会执行下面的代码:

    class Future:
        ...
        def __iter__(self):
            ...
            assert self.done(), "yield from wasn't used with future"
            return self.result()  # May raise too.

      最终返回结果给调用方。

  • 相关阅读:
    【BZOJ 1185】 凸包+旋转卡壳
    【BZOJ 2829】 2829: 信用卡凸包 (凸包)
    【BZOJ 1045】 1045: [HAOI2008] 糖果传递
    【BZOJ 2453|bzoj 2120】 2453: 维护队列 (分块+二分)
    【BZOJ 3343 】 分块
    【BZOJ 1069】 凸包+旋转卡壳
    【NOIP 2016 总结】
    【无聊放个模板系列】洛谷 负环 模板
    【无聊放个模板系列】BZOJ 3172 (AC自动机)
    【无聊放个模板系列】HDU 3506 (四边形不等式优化DP-经典石子合并问题[环形])
  • 原文地址:https://www.cnblogs.com/tashanzhishi/p/10774515.html
Copyright © 2011-2022 走看看