zoukankan      html  css  js  c++  java
  • 【异步】:asyncio

    异步asyncio

    asyncio是一个使用async / await语法编写并发代码的库

    asyncio用作多个Python异步框架的基础,这些框架提供高性能的网络和Web服务器,数据库连接库,分布式任务队列等。

    asyncio通常非常适合IO绑定和高级 结构化网络代码。

    asyncio提供了一组高级 API:

    此外,还有一些用于库和框架开发人员的低级 API 

    Conroutines

    使用async / await语法声明的协同程序是编写asyncio应用程序的首选方法。例如,以下代码片段(需要Python 3.7+)打印“hello”,等待1秒,然后打印“world”:

    import asyncio
    
    async def main():
         print('hello')
         await asyncio.sleep(1)
         print('world')
    
    asyncio.run(main())
    
    # hello
    # world

    上面代码等同于下面(不推荐使用基于生成器的协同程序的支持,并计划在Python 3.10中删除。

    import asyncio
    
    @asyncio.coroutine
    def main():
         print('hello')
         yield  from asyncio.sleep(1)
         print('world')
    
    asyncio.run(main())

    asyncio实际等同于下面的工作(参数为An asyncio.Future, a coroutine or an awaitable is required)

    import asyncio
    
    @asyncio.coroutine
    def main():
         print('hello')
         yield  from asyncio.sleep(1)
         print('world')
    
    # asyncio.run(main())
    loop = asyncio.events.new_event_loop()
    asyncio.events.set_event_loop(loop)
    loop.run_until_complete(main())
    
    # hello
    # world
    
     1     This function runs the passed coroutine, taking care of
     2     managing the asyncio event loop and finalizing asynchronous
     3     generators.
     4 
     5     This function cannot be called when another asyncio event loop is
     6     running in the same thread.
     7 
     8     If debug is True, the event loop will be run in debug mode.
     9 
    10     This function always creates a new event loop and closes it at the end.
    11     It should be used as a main entry point for asyncio programs, and should
    12     ideally only be called once.
    asyncio.run功能介绍

    实际运行协程asyncio提供了三种主要机制:

    1、The asyncio.run()函数来运行顶层入口点“main()”函数(见上面的例子)

    2、Awaiting on a coroutine 以下代码片段将在等待1秒后打印“hello”,然后在等待另外 2秒后打印“world” 

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    
    # started at 11:54:48
    # hello
    # world
    # finished at 11:54:51
    

    3、asyncio.create_task()与asyncio同时运行协同程序功能Tasks让我们修改上面的例子并同时运行两个say_after协同程序 :

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(f"{what} at {time.strftime('%X')}")
    
    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
        task2 = asyncio.create_task(
            say_after(2, 'world'))
        print(f"started at {time.strftime('%X')}")
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
    
        await task1
        await task2
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    
    # started at 14:27:22
    # hello at 14:27:23
    # world at 14:27:24
    # finished at 14:27:24

     稍微改变一下形式,可以理解的更多

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(f"{what} at {time.strftime('%X')}")
    
    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
        task2 = asyncio.create_task(
            say_after(2, 'world'))
        print(f"started at {time.strftime('%X')}")
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await asyncio.sleep(3)
    
        # await task1
        # await task2
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    
    # started at 14:29:41
    # hello at 14:29:42
    # world at 14:29:43
    # finished at 14:29:44
    

      

     Awaitables

    我们说如果一个对象可以在表达式中使用,那么它就是一个等待对象await许多asyncio API旨在接受等待。

    有三种主要类型的等待对象:coroutinesTasks, and Futures.

    Coroutines

    Python coroutines are awaitables and therefore can be awaited from other coroutines:

    import asyncio
    
    async def nested():
        return 42
    
    async def main():
        # Nothing happens if we just call "nested()".
        # A coroutine object is created but not awaited,
        # so it *won't run at all*.
        nested()
    
        # Let's do it differently now and await it:
        print(await nested())  # will print "42".
    
    asyncio.run(main())
    
    # 42
    

    重要

    在本文档中,术语“coroutine”可用于两个密切相关的概念:

    • 一个协程功能:一个功能;async def
    • 一个协程对象:通过调用协同程序函数返回的对象 

    Tasks

    任务用于调度协同程序并发。

    当一个协程被包装到一个Task中时,会像asyncio.create_task()一样  conroutine自动安排很快运行:

    import asyncio
    
    async def nested():
        return 42
    
    async def main():
        # Schedule nested() to run soon concurrently
        # with "main()".
        task = asyncio.create_task(nested())
    
        # "task" can now be used to cancel "nested()", or
        # can simply be awaited to wait until it is complete:
        await task
    
    asyncio.run(main())

    Futures

    Future是一个特殊的低级别等待对象,它表示异步操作最终结果

    等待 Future对象时,它意味着协程将等到Future在其他地方解析。

    需要asyncio中的未来对象以允许基于回调的代码与async / await一起使用。

    通常,不需要在应用程序级代码中创建Future对象。

    可以等待有时通过库和一些asyncio API公开的未来对象:

    async def main():
        await function_that_returns_a_future_object()
    
        # this is also valid:
        await asyncio.gather(
            function_that_returns_a_future_object(),
            some_python_coroutine()
        )
    

    返回Future对象的低级函数的一个很好的例子是loop.run_in_executor()

    Asyncio方法

    1、运行asyncio程序 

     asyncio.runcoro*debug = False 

      此函数运行传递的协同程序,负责管理asyncio事件循环并最终确定异步生成器

      当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数。

      如果debugTrue,则事件循环将以调试模式运行。

      此函数始终创建一个新的事件循环并在结束时将其关闭。它应该用作asyncio程序的主要入口点,理想情况下应该只调用一次。

      版本3.7中的新功能重要:此功能已临时添加到Python 3.7中的asyncio中

    2、创建任务 

    asyncio.create_task(coro)

      将coro coroutine包装成a Task 并安排执行。返回Task对象。

      任务在返回的循环中执行,如果当前线程中没有运行循环get_running_loop(), RuntimeError则引发该任务

      Python 3.7中添加了此功能在Python 3.7之前,asyncio.ensure_future()可以使用低级函数:

    async def coro():
        ...
    
    # In Python 3.7+
    task = asyncio.create_task(coro())
    ...
    
    # This works in all Python versions but is less readable
    task = asyncio.ensure_future(coro())
    

     

    3、sleeping 

    coroutine asyncio.sleep(delayresult=None*loop=None)

      阻止 delay seconds.

      如果提供了result ,则在协程完成时将其返回给调用者。

      leep() 始终挂起当前任务,允许其他任务运行。

      该loop 参数已被弃用,并定于去除在Python 3.10。

      协程示例每秒显示当前日期5秒:

    import asyncio
    import datetime
    
    async def display_date():
        loop = asyncio.get_running_loop()
        end_time = loop.time() + 5.0
        while True:
            print(datetime.datetime.now())
            if (loop.time() + 1.0) >= end_time:
                break
            await asyncio.sleep(1)
    
    asyncio.run(display_date())
    

      

    4、同时运行任务

    awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

      同时aws 序列中运行 awaitable objects

      如果在aws中任何awaitable 是协程,它将自动安排为任务

      如果所有等待成功完成,则结果是返回值的汇总列表。结果值的顺序对应于aws中的等待顺序

      如果return_exceptionsFalse(默认),则第一个引发的异常会立即传播到等待的任务gather()

      如果return_exceptionsTrue,异常的处理方式一样成功的结果,并在结果列表汇总。

      如果gather()取消,所有提交的awaitables(尚未完成)也被取消

      如果aws序列中的Task or Future取消,则将其视为已引发CancelledError在这种情况下不会取消gather() 呼叫这是为了防止取消一个提交的Tasks/Futures 以导致其他任务/期货被取消。

    import asyncio
    
    async def factorial(name, number):
        f = 1
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
    
    async def main():
        # Schedule three calls *concurrently*:
        await asyncio.gather(
            factorial("A", 2),
            factorial("B", 3),
            factorial("C", 4),
        )
    
    asyncio.run(main())
    
    # Expected output:
    #
    #     Task A: Compute factorial(2)...
    #     Task B: Compute factorial(2)...
    #     Task C: Compute factorial(2)...
    #     Task A: factorial(2) = 2
    #     Task B: Compute factorial(3)...
    #     Task C: Compute factorial(3)...
    #     Task B: factorial(3) = 6
    #     Task C: Compute factorial(4)...
    #     Task C: factorial(4) = 24
    

     获取返回结果,异常情况

    import asyncio
    
    async def factorial(name, number):
    
        print(name)
        if name == 'A':
            return name
        elif name == 'B':
            raise SyntaxError(name)
        await asyncio.sleep(number)
    
    
    async def main():
        # Schedule three calls *concurrently*:
        result = await asyncio.gather(
            factorial("A", 2),
            factorial("B", 3),
            factorial("C", 4),
            return_exceptions=True
        )
        print(result)
    
    asyncio.run(main())
    
    # A
    # B
    # C
    # ['A', SyntaxError('B'), None]
    

    版本3.7中已更改:如果取消聚集本身,则无论return_exceptions如何,都会传播取消

    5、Shielding From Cancellation

    awaitable asyncio.shield(aw*loop=None)

      Protect an awaitable object from being cancelled.

      If aw is a coroutine it is automatically scheduled as a Task.

      The statement:

    res = await shield(something())
    

      等同于

    res = await something()

      除非取消包含它的协程,否则something()不会取消运行的任务从观点来看something(),取消没有发生。虽然它的来电者仍然被取消,所以“等待”表达仍然提出了一个CancelledError

      如果something()通过其他方式取消(即从内部取消)也会取消shield()

      如果希望完全忽略取消(不推荐),则该shield()函数应与try / except子句结合使用,如下所示:

    try:
        res = await shield(something())
    except CancelledError:
        res = None
    

    6、超时

    coroutine asyncio.wait_for(aw, timeout, *, loop=None)

      Wait for the aw awaitable to complete with a timeout.

      If aw is a coroutine it is automatically scheduled as a Task.

      timeout可以是None或等待的float或int秒数。如果超时None,将等到完成

      如果发生超时,它将取消任务并加注 asyncio.TimeoutError

      要避免该任务cancellation,请将其包装shield()

      该函数将一直等到将来实际取消,因此总等待时间可能会超过超时

      如果等待被取消,则未来的aw也会被取消。

      该循环参数已被弃用,并定于去除在Python 3.10。

    async def eternity():
        # Sleep for one hour
        await asyncio.sleep(3600)
        print('yay!')
    
    async def main():
        # Wait for at most 1 second
        try:
            await asyncio.wait_for(eternity(), timeout=1.0)
        except asyncio.TimeoutError:
            print('timeout!')
    
    asyncio.run(main())
    
    # Expected output:
    #
    #     timeout!
    

    改变在3.7版本:AW被取消,由于超时,wait_for等待AW被取消。以前,它asyncio.TimeoutError立即提出 

    7、超时原语

    coroutine asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

      同时运行aws中的等待对象 并阻塞,直到return_when指定的条件为止

      如果在aws中任何等待的是协程,它将自动安排为任务。wait()直接传递协同程序对象 已被弃用,因为它会导致 混乱的行为

      返回两组任务/期货:(done, pending)

      用法:

    done, pending = await asyncio.wait(aws)

      该循环参数已被弃用,并定于去除在Python 3.10。

      timeout(浮点数或整数),如果指定,可用于控制返回前等待的最大秒数。

      请注意,此功能不会引发asyncio.TimeoutError超时发生时未完成的期货或任务仅在第二组中返回。

      return_when表示此函数何时返回。它必须是以下常量之一:

    不变描述
    FIRST_COMPLETED 当任何未来完成或取消时,该函数将返回。
    FIRST_EXCEPTION 当任何未来通过引发异常完成时,函数将返回。如果没有未来引发异常则等同于 ALL_COMPLETED
    ALL_COMPLETED 所有期货结束或取消时,该功能将返回。

      不像wait_for()wait()当发生超时不会取消期货。   

      注意 wait()将协同程序自动调度为任务,然后在 集合中返回隐式创建的任务对象。因此,以下代码将无法按预期方式工作:(done, pending)

    async def foo():
        return 42
    
    coro = foo()
    done, pending = await asyncio.wait({coro})
    
    if coro in done:
        # This branch will never be run!
    

      以下是如何修复上述代码段:

    async def foo():
        return 42
    
    task = asyncio.create_task(foo())
    done, pending = await asyncio.wait({task})
    
    if task in done:
        # Everything will work as expected now.
    

      wait()不推荐直接传递协程对象。

    8、 Scheduling From Other Threads

    asyncio.run_coroutine_threadsafe(coroloop)

      将协程提交给给定的事件循环。线程安全的。

      返回a concurrent.futures.Future以等待另一个OS线程的结果。

      此函数旨在从与运行事件循环的OS线程不同的OS线程调用。例:

    # Create a coroutine
    coro = asyncio.sleep(1, result=3)
    
    # Submit the coroutine to a given loop
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    
    # Wait for the result with an optional timeout argument
    assert future.result(timeout) == 3
    

      如果在协程中引发异常,则会通知返回的Future。它还可以用于取消事件循环中的任务:

    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        print('The coroutine took too long, cancelling the task...')
        future.cancel()
    except Exception as exc:
        print(f'The coroutine raised an exception: {exc!r}')
    else:
        print(f'The coroutine returned: {result!r}')

      请参阅 文档并发和多线程部分。

      与其他asyncio函数不同,此函数需要 显式传递循环参数。

      3.5.1版中的新功能。

    9、自省

    asyncio.current_taskloop = None 

      返回当前正在运行的Task实例,或者None没有正在运行的任务。

      如果loopNone get_running_loop()用来获取loop。

      版本3.7中的新功能。

    asyncio.all_tasksloop = None 

      返回Task循环运行的一组尚未完成的对象。

      如果loopNoneget_running_loop()则用于获取当前循环。

      版本3.7中的新功能。

    任务对象

    class asyncio.Task(coro,*,loop = None 

    Future-like object that runs a Python coroutine. Not thread-safe.

    任务用于在事件循环中运行协同程序。如果一个协程在Future上等待,则Task暂停执行协程并等待Future的完成。当Future 完成后,包装协程的执行将恢复。

    事件循环使用协作调度:事件循环一次运行一个任务。当一个Task等待完成Future时,事件循环运行其他任务,回调或执行IO操作。

    使用高级asyncio.create_task()功能创建任务,或低级别loop.create_task()或 ensure_future()功能。不鼓励手动实例化任务。

    要取消正在运行的任务,请使用该cancel()方法。调用它将导致Task将CancelledError异常抛出到包装的协同程序中。如果在取消期间协程正在等待Future对象,则Future对象将被取消。

    cancelled()可用于检查任务是否被取消。True如果包装的协程没有抑制CancelledError异常并且实际上被取消,则该方法返回

    asyncio.Task继承自Future其所有API,除了Future.set_result()和 Future.set_exception()

    任务支持该contextvars模块。创建任务时,它会复制当前上下文,然后在复制的上下文中运行其协程。

    版本3.7中已更改:添加了对contextvars模块的支持

    cancel

      请求取消任务。

      这会安排CancelledError在事件循环的下一个循环中将异常抛入包装的协程。

      协程则有机会通过抑制异常与清理,甚至拒绝请求try... ... ... ... ... ... 块。因此,不同于不保证任务将被取消,尽管完全抑制取消并不常见,并且积极地不鼓励。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()

      以下示例说明了协同程序如何拦截取消请求:

    async def cancel_me():
        print('cancel_me(): before sleep')
    
        try:
            # Wait for 1 hour
            await asyncio.sleep(3600)
        except asyncio.CancelledError:
            print('cancel_me(): cancel sleep')
            raise
        finally:
            print('cancel_me(): after sleep')
    
    async def main():
        # Create a "cancel_me" Task
        task = asyncio.create_task(cancel_me())
    
        # Wait for 1 second
        await asyncio.sleep(1)
    
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            print("main(): cancel_me is cancelled now")
    
    asyncio.run(main())
    
    # Expected output:
    #
    #     cancel_me(): before sleep
    #     cancel_me(): cancel sleep
    #     cancel_me(): after sleep
    #     main(): cancel_me is cancelled now
    
    cancelled
      True如果任务被取消,则返回。
      请求取消时取消任务, cancel()并且包装的协同程序将CancelledError异常传播 到其中。
    done
      True如果任务完成则返回。
      一个任务完成时,包裹协程要么返回的值,引发异常,或者任务被取消。
    result
      返回任务的结果。
      如果任务完成,则返回包装协程的结果(或者如果协程引发异常,则重新引发该异常。)
      如果已取消任务,则此方法会引发CancelledError异常。
      如果Task的结果尚不可用,则此方法会引发InvalidStateError异常。
    exception
      返回Task的例外。
      如果包装的协同程序引发异常,则返回异常。如果包装的协程正常返回,则此方法返回None
      如果已取消任务,则此方法会引发 CancelledError异常。
      如果尚未完成任务,则此方法会引发 InvalidStateError异常。
    add_done_callback回调*上下文=无
      添加要在任务完成时运行的回调。
      此方法仅应在基于低级回调的代码中使用。 
      有关Future.add_done_callback() 详细信息,请参阅文档。
    remove_done_callback回调
      从回调列表中删除回调
      此方法仅应在基于低级回调的代码中使用。
      有关Future.remove_done_callback() 详细信息,请参阅文档。
    get_stack*limit = None 
      返回此任务的堆栈帧列表。
      如果未完成包装的协同程序,则会返回挂起它的堆栈。如果协程已成功完成或被取消,则返回一个空列表。如果协程被异常终止,则返回回溯帧列表。
      帧始终从最旧到最新排序。
      对于挂起的协程,只返回一个堆栈帧。
      可选的limit参数设置要返回的最大帧数; 默认情况下,返回所有可用的帧。返回列表的排序取决于是返回堆栈还是返回:返回堆栈的最新帧,但返回最旧的回溯帧。(这与回溯模块的行为相匹配。)
    print_stack*limit = Nonefile = None 
      打印此任务的堆栈或回溯。
      这会为检索到的帧生成类似于回溯模块的输出get_stack()
      该极限参数传递给get_stack()直接。
      的文件参数是其中输出被写入的I / O流; 默认输出写入sys.stderr
    classmethod all_tasks(loop = None 
      返回一组事件循环的所有任务。
      默认情况下,返回当前事件循环的所有任务。如果是loopNone,则该get_event_loop()函数用于获取当前循环。
      不推荐使用此方法,将在Python 3.9中删除。请改用此asyncio.all_tasks()功能。
    classmethod current_task(loop = None 
      返回当前正在运行的任务或None
      如果是loopNone,则该get_event_loop()函数用于获取当前循环。
      不推荐使用此方法,将在Python 3.9中删除。请改用此asyncio.current_task()功能。

     其他

    1、async for 运用

    import asyncio
    
    
    class AsyncIter:
        def __init__(self, items):
            self.items = items
    
        async def __aiter__(self):
            for item in self.items:
                await asyncio.sleep(1)
                yield item
    
    
    async def print_iter(things):
        async for item in things:
            print(item)
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        things = AsyncIter([1, 2, 3])
        loop.run_until_complete(print_iter(things))
        loop.close()
    

      

    资料

    Python异步IO实现全过程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ

    Python异步IO实现全过程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA

    Python异步IO实现全过程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ

  • 相关阅读:
    BufferedGraphics 性能测试
    ManualResetEvent 与 AutoResetEvent 区别
    管道式编程(收藏)
    C# 中扩展方法应用
    WinForm Invoke 调用 传入 out 类型参数
    断路器选型的一些理解
    为什么通了PE线,现场设备外壳还需要接地?
    RS485终端电阻解释
    驱动器的“安全转矩关断(Safe Torque Off,STO)”
    TCP和UDP的优缺点及区别
  • 原文地址:https://www.cnblogs.com/lianzhilei/p/9885270.html
Copyright © 2011-2022 走看看