zoukankan      html  css  js  c++  java
  • [译]Python中的异步IO:一个完整的演练

    原文:Async IO in Python: A Complete Walkthrough
    原文作者: Brad Solomon
    原文发布时间:2019年1月16日
    翻译:Tacey Wong
    翻译时间:2019年7月22日

    翻译仅便于个人学习,熟悉英语的请阅读原文


    目录


    Async IO是一种并发编程设计,Python中已经有了独立的支持,并且从Python3.4到Python3.7得到了快速发展。

    你可能疑惑,“并发、并行、线程、多处理”。MMP这已经很多了,异步IO是哪根葱?”

    本教程旨在帮助你回答这个问题,让你更牢固地掌握Python的异步IO。

    以下是要介绍的内容:

    • 异步IO:一种与语言无关的范例(模型),它具有许多跨编程语言的实现
    • async/await:两个 用于定义协程的新Python关键字
    • asyncio:为运行和管理协程提供基础和API的Python包/库

    协程(专用生成器函数)是Python中异步IO的核心,稍后我们将深入研究它们。

    注意:在本文中,使用术语异步IO来表示与语言无关的异步IO设计,而asyncio指的是Python包。

    开始之前,你需要确保已经配置搭建了可以使用asyncio及其他库的实验环境。

    搭建自己的实验环境

    你需要安装Python 3.7+以及aiohttpaiofiles包才能完整地跟随本文进行实验。

    $ python3.7 -m venv ./py37async
    $ source ./py37async/bin/activate  # Windows: .py37asyncScriptsactivate.bat
    $ pip install --upgrade pip aiohttp aiofiles  # 可选项: aiodns
    

    有关安装Python 3.7和设置虚拟环境的帮助,请查看Python 3安装和设置指南虚拟环境基础

    ok,let's go!

    异步IO鸟瞰图

    相较于它久经考验的表亲(多进程和多线程)来说,异步IO不太为人所知。本节将从高层全面地介绍异步IO是什么,以及哪些场景适合用它。

    哪些场景适合异步IO?

    并发和并行是个非常广泛的主题。因为本文重点介绍异步IO及其在Python中的实现,现在值得花一点时间将异步IO与其对应物进行比较,以了解异步IO如何适应更大、有时令人眼花缭乱的难题。

    并行:同时执行多个操作。
    多进程:是一种实现并行的方法,它需要将任务分散到计算机的中央处理单元(cpu或核心)上。多进程非常适合cpu密集的任务:密集for循环和密集数学计算通常属于这一类。
    并发:并发是一个比并行更广泛的术语。 它表明多个任务能够以重叠方式运行。 (有一种说法是并发并不意味着并行。)
    线程:是一种并发执行模型,多个线程轮流执行任务。 一个进程可以包含多个线程。 由于GIL(全局解释器锁)的存在,Python与线程有着复杂的关系,但这超出了本文的范围。

    了解线程的重要之处是它更适合于io密集的任务。cpu密集型任务的特点是计算机核心从开始到结束都在不断地工作,而一个IO密集型任务更多的是等待IO的完成。

    综上所述,并发既包括多进程(对于CPU密集任务来说是理想的),也包括线程(对于IO密集型任务来说是理想的)。多进程是并行的一种形式,并行是并发的一种特定类型(子集)。Python通过multiprocessing, threading, 和concurrent.futures标准库为这两者提供了长期支持。

    现在是时候召集一名新成员了!在过去的几年里,一个独立的设计被更全面地嵌入到了CPython中:通过标准库的asyncio包和新的async/await语言关键字实现异步IO。需要说明的是,异步IO不是一个新发明的概念,它已经存在或正在构建到其他语言和运行时环境中,比如Golang、C#或者Scala。

    Python文档将asyncio包称为用于编写并发代码的库。然而,异步IO既不是多线程也不是多进程,它不是建立在其中任何一个之上。事实上异步IO是一种单进程单线程设计:它使用协作式多任务操作方式,在本教程结束时你将理解这个术语。换句话说,尽管在单个进程中使用单个线程,但异步IO给人一种并发的感觉。协程(异步IO的一个核心特性)可以并发地调度,但它们本质上不是并发的。

    重申一下,异步输入输出是并发编程的一种风格,但不是并行的。与多进程相比,它与线程更紧密地结合在一起,但与这两者截然不同,并且是并发技术包中的独立成员。

    现在还留下了一个词没有解释。 异步是什么意思?这不是一个严格的定义,但是对于我们这里的目的,我可以想到/考虑到两个属性:

    • 异步例程能够在等待其最终结果时“暂停”,并允许其他例程同时运行。
    • 通过上面的机制,异步代码便于并发执行。 换句话说,异步代码提供了并发的外观和感觉

    下面是一个一个将所有内容组合在一起的图表。 白色术语代表概念,绿色术语代表实现或实现它们的方式:

    (Concurrencey并发、Threading线程、Async IO异步IO、Parallelism并行、Multiprocessing多进程)

    我将在这里停止对并发编程模型的比较。本教程重点介绍异步IO的子组件,如何使用它、以及围绕它创建的API。要深入研究线程、多处理和异步IO,请暂停这里并查看Jim Anderson对(Python中并发性的概述)[https://realpython.com/python-concurrency/]。Jim比我有趣得多,而且参加的会议也比我多。

    译者注:要了解多种并发模型的比较,可以参考(《七周七并发模型》

    异步IO释义

    异步IO乍一看似乎违反直觉,自相矛盾。如何使用一个线程和一个CPU内核来简化并发代码?我从来都不擅长编造例子,所以我想借用 Miguel Grinberg2017年PyCon演讲中的一个例子,这个例子很好地解释了一切:

    国际象棋大师JuditPolgár举办了一个国际象棋比赛,在那里她扮演多个业余选手。 她有两种方式进行比赛:同步和异步。
    假设:

    • 24个对手
    • Judit在5秒钟内完成一个棋子的移动
    • 每个对手移动一个棋子需要55秒
    • 游戏平均30对移动(总计60次移动)
      同步版本:Judit一次只玩一场游戏,从不同时玩两场,直到游戏结束。每场比赛需要(55 + 5)* 30 == 1800秒,或30分钟。 整个比赛需要24 * 30 == 720分钟,或12小时。
      异步版本:Judit从一张桌子走到另一张桌子,每张桌子走一步。她离开了牌桌,让对手在等待的时间里采取下一步行动。在所有24场比赛中,一个动作需要Judit 24 * 5 == 120秒,即2分钟。整个比赛现在被缩减到120 * 30 == 3600秒,也就是1小时。

    只有一个JuditPolgár,她只有两只手,一次只做一次动作。但是,异步进行将展览时间从12小时减少到1小时。因此,协同多任务处理是一种奇特的方式,可以说一个程序的事件循环(稍后会有更多)与多个任务通信,让每个任务在最佳时间轮流运行。

    异步IO需要很长的等待时间,否则函数将被阻塞,并允许其他函数在停机期间运行。

    异步IO使用起来不容易

    我听人说过“当你能够的时候使用异步IO;必要时使用线程”。事实是,构建持久的多线程代码可能很难,并且容易出错。异步IO避免了一些线程设计可能遇到的潜在速度障碍。

    但这并不是说Python中的异步IO很容易。警告:当你稍微深入其中时,异步编程也会很困难!Python的异步模型是围绕诸如回调,事件,传输,协议和future等概念构建的 -——术语可能令人生畏。事实上,它的API一直在不断变化,这使得它变得比较难。

    幸运的是,asyncio已经相对成熟,其大部分功能不再处于临时性状态,而其文档也有了大规模的改善,并且该主题的一些优质资源也开始出现。

    asyncio 包和 async/await

    现在你已经对异步输IO作为一种设计有了一定的了解,让我们来探讨一下Python的实现。Python的asyncio包(在Python 3.4中引入)和它的两个关键字async和wait服务于不同的目的,但是它们会一起帮助你声明、构建、执行和管理异步代码。

    async/await 语法和原生协程

    警告:小心你在网上读到的东西。Python的异步IO API已经从Python 3.4迅速发展到Python 3.7。一些旧的模式不再被使用,一些最初不被允许的东西现在通过新的引入被允许。据我所知,本教程也将很快加入过时的行列。

    异步IO的核心是协程。协程是Python生成器函数的一个专门版本。让我们从一个基线定义开始,然后随着你在此处的进展,以此为基础进行构建:协程是一个函数,它可以在到达返回之前暂停执行,并且可以在一段时间内间接将控制权传递给另一个协程。

    稍后,你将更深入地研究如何将传统生成器重新用于协程。目前,了解协程如何工作的最简单方法是开始编写一些协程代码。

    让我们采用沉浸式方法,编写一些异步输入输出代码。这个简短的程序是异步IO的Hello World,但它对展示其核心功能大有帮助:

    #!/usr/bin/env python3
    # countasync.py
    
    import asyncio
    
    async def count():
        print("One")
        await asyncio.sleep(1)
        print("Two")
    
    async def main():
        await asyncio.gather(count(), count(), count())
    
    if __name__ == "__main__":
        import time
        s = time.perf_counter()
        asyncio.run(main())
        elapsed = time.perf_counter() - s
        print(f"{__file__} executed in {elapsed:0.2f} seconds.")
    

    当你执行此文件时,请注意与仅用def和time.sleep()定义函数相比,看起来有什么不同:

    $ python3 countasync.py
    One
    One
    One
    Two
    Two
    Two
    countasync.py executed in 1.01 seconds.
    

    该输出的顺序是异步IO的核心。与count()的每个调用通信是一个事件循环或协调器。当每个任务到达asyncio.sleep(1)时,函数会向事件循环发出呼叫,并将控制权交还给它,例如,“我将休眠1秒。在这段时间里,做一些有意义的事情吧”。

    将此与同步版本进行对比::

    #!/usr/bin/env python3
    # countsync.py
    
    import time
    
    def count():
        print("One")
        time.sleep(1)
        print("Two")
    
    def main():
        for _ in range(3):
            count()
    
    if __name__ == "__main__":
        s = time.perf_counter()
        main()
        elapsed = time.perf_counter() - s
        print(f"{__file__} executed in {elapsed:0.2f} seconds.")
    

    执行时,顺序和执行时间会有轻微但严重的变化:

    $ python3 countsync.py
    One
    Two
    One
    Two
    One
    Two
    countsync.py executed in 3.01 seconds.
    

    虽然使用time.sleep()asyncio.sleep()看起来很普通,但是它们可以替代任何涉及等待时间的时间密集型进程。(您可以等待的最普通的事情是一个sleep()调用,它基本上什么也不做。)也就是说,time.sleep()可以表示任何耗时的阻塞函数调用,而asyncio.sleep()用于代替非阻塞调用(但也需要一些时间来完成)。

    你将在下一节中看到,等待某些东西(包括asyncio.sleep()的好处是,周围的函数可以暂时将控制权交给另一个更容易立即执行某些操作的函数。相比之下,time.sleep()或任何其他阻塞调用与异步Python代码不兼容,因为它会在睡眠时间内停止所有工作。

    异步IO规则

    此时,异步、wait和它们创建的协程函数的更正式定义已经就绪。这一节有点密集,但是掌握async/await是很有帮助的,所以如果需要的话,可以回到这里:

    • 语法async def引入了原生协程或异步生成器。async withasync for表达式也是有效的,稍后你将看到它们。
    • 关键词await将函数控制传递回事件循环(它暂停执行周围的协程)。如果Python在g()的范围内遇到await f()表达式,这就是await告诉事件循环,“暂停执行g()直到我等待的f()的结果 返回 。 与此同时,让其他东西运行。“

    在代码中,第二个要点大致是这样的:

    async def g():
        # 在这里暂停 ,f()执行完之后再返回到这里。
        return r
    

    关于何时以及能否使用async / await,还有一套严格的规则。无论您是在学习语法还是已经使用async / await,这些都非常方便:

    • 使用async def引入的函数是协程。它可以使用waitreturnyield,但所有这些都是可选的。声明async def noop(): pass 是合法的:
      • 使用wait和/或return创建一个coroutine函数。要调用coroutine函数,你必须等待它得到结果。
      • 在异步def块中使用yield不太常见(并且最近才在Python中合法)。这将创建一个异步生成器,您可以使用异步生成器进行迭代。 暂时忘掉异步生成器,重点关注使用await和/或return的协程函数的语法。
      • 任何使用async def定义的东西都不能使用yield from,这会引发SyntaxError(语法错误)。
    • 就像在def函数之外使用yield是一个SyntaxError一样,在async def协程之外使用wait也是一个SyntaxError

    以下是一些简洁的示例,旨在总结以上几条规则:

    async def f(x):
        y = await z(x)  # OK - `await` and `return` allowed in coroutines
        return y
    
    async def g(x):
        yield x  # OK - this is an async generator
    
    async def m(x):
        yield from gen(x)  # No - SyntaxError
    
    def m(x):
        y = await z(x)  # Still no - SyntaxError (no `async def` here)
        return y
    

    最后,当您使用await f()时,它要求f()是一个awaitable对象。嗯,这不是很有帮助,是吗? 现在,只要知道一个等待对象是(1)另一个协程或(2)定义返回一个迭代器.__ await __()dunder方法的对象。如果你正在编写一个程序,在大多数情况下,你只需要担心第一种情况。

    这又给我们带来了一个你可能会看到的技术上的区别:将函数标记为coroutine的一个老方法是用@asyncio.coroutine来修饰一个普通的def函数。结果是基于生成器的协同程序。自从在Python 3.5中引入async/await语法以来,这种结构已经过时了。

    这两个协程本质上是等价的(都是可 awaitable的),但是第一个协程是基于生成器的,而第二个协程是一个原生协程:

    import asyncio
    
    @asyncio.coroutine
    def py34_coro():
        """Generator-based coroutine, older syntax"""
        yield from stuff()
    
    async def py35_coro():
        """Native coroutine, modern syntax"""
        await stuff()
    

    如果你自己编写任何代码,为了显式最好使用本机协程。基于生成器的协程将在Python 3.10中删除

    在本教程的后半部分,我们将仅出于解释的目的来讨论基于生成器的协同程序。引入async / await的原因是使协同程序成为Python的独立功能,可以很容易地与正常的生成器函数区分开来,从而减少歧义。

    不要陷入基于生成器的协程中,这些协同程序已随着async / await的出现而过时了。如果你坚持async/await语法,它们有自己的小规则集(例如,await不能在基于生成器的协同程序中使用),这些规则在很大程度上是不相关的。

    废话不多说,让我们来看几个更复杂的例子。

    下面是异步IO如何减少等待时间的一个例子:给定一个协程makerandom(),它一直在[0,10]范围内产生随机整数,直到其中一个超过阈值,你想让这个协程的多次调用不需要等待彼此连续完成。你可以在很大程度上遵循上面两个脚本的模式,只需稍作修改:

    #!/usr/bin/env python3
    # rand.py
    
    import asyncio
    import random
    
    # ANSI colors
    c = (
        "33[0m",   # End of color
        "33[36m",  # Cyan
        "33[91m",  # Red
        "33[35m",  # Magenta
    )
    
    async def makerandom(idx: int, threshold: int = 6) -> int:
        print(c[idx + 1] + f"Initiated makerandom({idx}).")
        i = random.randint(0, 10)
        while i <= threshold:
            print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
            await asyncio.sleep(idx + 1)
            i = random.randint(0, 10)
        print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
        return i
    
    async def main():
        res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
        return res
    
    if __name__ == "__main__":
        random.seed(444)
        r1, r2, r3 = asyncio.run(main())
        print()
        print(f"r1: {r1}, r2: {r2}, r3: {r3}")
    

    彩色输出比我能说的多得多,并让你了解这个脚本是如何执行的:

    该程序使用一个主协程makerandom(),并在3个不同的输入上同时运行它。大多数程序将包含小型、模块化的协程和一个包装器函数,用于将每个较小的协程链接在一起。然后,main()用中央协程映射到某个可迭代的池中收集任务(future)。

    在这个小例子中,池是range(3)。在稍后介绍的更全面的示例中,它是一组需要同时请求,解析和处理的URL,main()封装了每个URL的整个例程。

    虽然“制作随机整数”(CPU密集比这更复杂)可能不是作为asyncio候选者的最佳选择,但是在示例中存在asyncio.sleep(),旨在模仿不确定等待时间的IO密集进程 。例如,asyncio.sleep()调用可能表示在消息应用程序中的两个客户端之间发送和接收不那么随机的整数。

    异步IO设计模式

    Async IO附带了它自己的一组脚本设计,您将在本节中介绍这些脚本设计。

    链式协程

    协程的一个关键特性是它们可以链接在一起(记住,一个协成对象是awaitable的,所以另外一个协成可以await它)。这允许你将程序分成更小的、可管理的、可回收的协同程序:

    #!/usr/bin/env python3
    # chained.py
    
    import asyncio
    import random
    import time
    
    async def part1(n: int) -> str:
        i = random.randint(0, 10)
        print(f"part1({n}) sleeping for {i} seconds.")
        await asyncio.sleep(i)
        result = f"result{n}-1"
        print(f"Returning part1({n}) == {result}.")
        return result
    
    async def part2(n: int, arg: str) -> str:
        i = random.randint(0, 10)
        print(f"part2{n, arg} sleeping for {i} seconds.")
        await asyncio.sleep(i)
        result = f"result{n}-2 derived from {arg}"
        print(f"Returning part2{n, arg} == {result}.")
        return result
    
    async def chain(n: int) -> None:
        start = time.perf_counter()
        p1 = await part1(n)
        p2 = await part2(n, p1)
        end = time.perf_counter() - start
        print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")
    
    async def main(*args):
        await asyncio.gather(*(chain(n) for n in args))
    
    if __name__ == "__main__":
        import sys
        random.seed(444)
        args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
        start = time.perf_counter()
        asyncio.run(main(*args))
        end = time.perf_counter() - start
        print(f"Program finished in {end:0.2f} seconds.")
    

    请仔细注意输出,其中part1()睡眠时间可变,part2()在结果可用时开始处理结果:

    $ python3 chained.py 9 6 3
    part1(9) sleeping for 4 seconds.
    part1(6) sleeping for 4 seconds.
    part1(3) sleeping for 0 seconds.
    Returning part1(3) == result3-1.
    part2(3, 'result3-1') sleeping for 4 seconds.
    Returning part1(9) == result9-1.
    part2(9, 'result9-1') sleeping for 7 seconds.
    Returning part1(6) == result6-1.
    part2(6, 'result6-1') sleeping for 4 seconds.
    Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
    -->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
    Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
    -->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
    Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
    -->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
    Program finished in 11.01 seconds.
    

    在此设置中,main()的运行时间将等于它收集和调度的任务的最大运行时间。

    使用队列

    asyncio包提供了与queue模块的类类似的queue classes类。在我们到目前为止的示例中,我们并不真正需要队列结构。在 chained.py中,每个任务(future)都由一组协同程序组成,这些协同程序显式地相互等待,并在每个链上传递一个输入。

    还有一种替代结构也可以用于异步IO:许多生产者,彼此没有关联,将项目添加到队列中。每个生产者可以在交错、随机、未宣布的时间向队列添加多个项。当商品出现时,一组消费者贪婪地从队列中取出商品,不等待任何其他信号。

    在这种设计中,没有任何个体消费者与生产者的链接。消费者事先不知道生产者的数量,甚至不知道将添加到队列中的累计项目数。

    单个生产者或消费者分别从队列中放置和提取项所需的时间是可变的。队列充当一个吞吐量,它可以与生产者和消费者通信,而不需要它们彼此直接通信。

    注意:虽然队列通常用于线程程序,因为queue.Queue()的线程安全性。在涉及异步IO时,您不需要关心线程安全性(例外情况是当你将两者结合时,但在本教程中没有这样做。)【译者注:这里的两者结合说的是异步IO和多线程结合】。队列的一个用例(如这里的例子)是队列充当生产者和消费者的发送器,否则它们不会直接链接或关联在一起。

    这个程序的同步版本看起来相当糟糕:一组阻塞生成器按顺序将项添加到队列中,一次一个生产者。只有在所有生产者完成之后,队列才可以由一个消费者一次处理一个项一个项地处理。这种设计有大量的延迟。物品可能会闲置在队列中,而不是立即拿起并处理。

    下面是异步版本asyncq.py
    这个工作流程的挑战在于需要向消费者发出生产完成的信号。否则,await q.get()将无限期挂起,因为队列已经被完全处理,但是消费者并不知道生产已经完成。

    (非常感谢StackOverflow用户帮助理顺main():关键是await q.join(),它将一直阻塞到队列中的所有项都被接收和处理,然后取消消费者任务,否则这些任务会挂起并无休止地等待其他队列项出现)

    下面是完整的脚本:

    #!/usr/bin/env python3
    # asyncq.py
    
    import asyncio
    import itertools as it
    import os
    import random
    import time
    
    async def makeitem(size: int = 5) -> str:
        return os.urandom(size).hex()
    
    async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
        i = random.randint(0, 10)
        if caller:
            print(f"{caller} sleeping for {i} seconds.")
        await asyncio.sleep(i)
    
    async def produce(name: int, q: asyncio.Queue) -> None:
        n = random.randint(0, 10)
        for _ in it.repeat(None, n):  # Synchronous loop for each single producer
            await randsleep(caller=f"Producer {name}")
            i = await makeitem()
            t = time.perf_counter()
            await q.put((i, t))
            print(f"Producer {name} added <{i}> to queue.")
    
    async def consume(name: int, q: asyncio.Queue) -> None:
        while True:
            await randsleep(caller=f"Consumer {name}")
            i, t = await q.get()
            now = time.perf_counter()
            print(f"Consumer {name} got element <{i}>"
                  f" in {now-t:0.5f} seconds.")
            q.task_done()
    
    async def main(nprod: int, ncon: int):
        q = asyncio.Queue()
        producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
        consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
        await asyncio.gather(*producers)
        await q.join()  # Implicitly awaits consumers, too
        for c in consumers:
            c.cancel()
    
    if __name__ == "__main__":
        import argparse
        random.seed(444)
        parser = argparse.ArgumentParser()
        parser.add_argument("-p", "--nprod", type=int, default=5)
        parser.add_argument("-c", "--ncon", type=int, default=10)
        ns = parser.parse_args()
        start = time.perf_counter()
        asyncio.run(main(**ns.__dict__))
        elapsed = time.perf_counter() - start
        print(f"Program completed in {elapsed:0.5f} seconds.")
    

    前几个协同程序是辅助函数,它返回随机字符串,小数秒性能计数器和随机整数。生产者将1到5个项目放入队列中。 每个项目是(i,t)的元组,其中i是随机字符串,t是生产者尝试将元组放入队列的时间。

    当消费者将项目拉出时,它只使用项目所在的时间戳计算项目在队列中所用的时间。

    请记住,asyncio.sleep()是用来模拟其他一些更复杂的协同程序的,如果它是一个常规的阻塞函数,会消耗时间并阻塞所有其他的执行。
    .

    下面是一个有两个生产者和五个消费者的测试:

    $ python3 asyncq.py -p 2 -c 5
    Producer 0 sleeping for 3 seconds.
    Producer 1 sleeping for 3 seconds.
    Consumer 0 sleeping for 4 seconds.
    Consumer 1 sleeping for 3 seconds.
    Consumer 2 sleeping for 3 seconds.
    Consumer 3 sleeping for 5 seconds.
    Consumer 4 sleeping for 4 seconds.
    Producer 0 added <377b1e8f82> to queue.
    Producer 0 sleeping for 5 seconds.
    Producer 1 added <413b8802f8> to queue.
    Consumer 1 got element <377b1e8f82> in 0.00013 seconds.
    Consumer 1 sleeping for 3 seconds.
    Consumer 2 got element <413b8802f8> in 0.00009 seconds.
    Consumer 2 sleeping for 4 seconds.
    Producer 0 added <06c055b3ab> to queue.
    Producer 0 sleeping for 1 seconds.
    Consumer 0 got element <06c055b3ab> in 0.00021 seconds.
    Consumer 0 sleeping for 4 seconds.
    Producer 0 added <17a8613276> to queue.
    Consumer 4 got element <17a8613276> in 0.00022 seconds.
    Consumer 4 sleeping for 5 seconds.
    Program completed in 9.00954 seconds.
    
    

    在这种情况下,项目在几分之一秒内处理。 延迟可能有两个原因:

    • 标准的,在很大程度上不可避免的开销
    • 当一个项出现在队列中时,所有消费者都在睡觉的情况

    关于第二个原因,幸运的是,扩展到成百上千的消费者是完全正常的。用python3 asyncq.py -p 5 - c100应该没有问题。这里的要点是,理论上,您可以让不同系统上的不同用户控制生产者和消费者的管理,队列充当中央吞吐量。

    到目前为止,您已经跳进了火坑。了解了三个asyncio调用async和await定义的协程并等待的示例。如果你没有完全关注或者只是想深入了解Python中现代协同程序的机制,下一节我们将开始讨论这个。

    生成器中异步IO的Roots

    之前,您看到了一个基于生成器的旧式协同程序的例子,它已经被更显式的原生协同程序所淘汰。这个例子值得重新展示一下:

    import asyncio
    
    @asyncio.coroutine
    def py34_coro():
        """Generator-based coroutine"""
        # No need to build these yourself, but be aware of what they are
        s = yield from stuff()
        return s
    
    async def py35_coro():
        """Native coroutine, modern syntax"""
        s = await stuff()
        return s
    
    async def stuff():
        return 0x10, 0x20, 0x30
    

    作一个实验,如果py34_coro()或py35_coro()调用自身,而不await或不调用asyncio.run()或其他asyncio函数,会发生什么?独调用一个协同程序会返回一个协同程序对象:

    >>> py35_coro()
    <coroutine object py35_coro at 0x10126dcc8>
    

    这表面上并不是很有趣。 调用协同程序的结果是一个awaitable的协程对象。

    测验时间:Python的其他什么功能跟这一样?(Python的哪些特性在单独调用时实际上没有多大作用?)

    希望你将生成器作为这个问题的答案,因为协同程序是增强型生成器。 在这方面的行为类似:

    >>> def gen():
    ...     yield 0x10, 0x20, 0x30
    ...
    >>> g = gen()
    >>> g  # Nothing much happens - need to iterate with `.__next__()`
    <generator object gen at 0x1012705e8>
    >>> next(g)
    (16, 32, 48)
    

    正如它所发生的那样,生成器函数是异步IO的基础(无论是否使用async def声明协程而不是旧的@asyncio.coroutine包装器)。从技术上讲,await更接近于yield from而非yield。(但请记住,yield from x()只是替换for i in x():yield i的语法糖)

    生成器与异步IO相关的一个关键特性是可以有效地随意停止和重新启动生成器。例如,你可以在生成器对象上进行迭代,然后在剩余的值上继续迭代。当一个生成器函数达到yield时,它会产生该值,但随后它会处于空闲状态,直到它被告知产生其后续值。

    这可以通过一个例子来充实:

    >>> from itertools import cycle
    >>> def endless():
    ...     """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""
    ...     yield from cycle((9, 8, 7, 6))
    
    >>> e = endless()
    >>> total = 0
    >>> for i in e:
    ...     if total < 30:
    ...         print(i, end=" ")
    ...         total += i
    ...     else:
    ...         print()
    ...         # Pause execution. We can resume later.
    ...         break
    9 8 7 6 9 8 7 6 9 8 7 6 9 8
    
    >>> # Resume
    >>> next(e), next(e), next(e)
    (6, 9, 8)
    

    await关键字的行为类似,标记了一个断点,协程挂起自己并允许其他协程工作。在这种情况下,“挂起”是指暂时放弃控制但未完全退出或结束协程。请记住,yield,以及由此产生的yield fromawait是发生器执行过程中的一个断点。

    这是函数和生成器之间的根本区别。一个函数要么全有要么全无。一旦它开始,它就不会停止,直到它到达一个return,然后将该值推给调用者(调用它的函数)。另一方面,生成器每次达到yield时都会暂停,不再继续。它不仅可以将这个值推入调用堆栈,而且当您通过对它调用next()恢复它时,它还可以保留它的局部变量。

    生成器的第二个特征虽然鲜为人知,却也也很重要。也可以通过其.send()方法将值发送到生成器。这允许生成器(和协同程序)相互调用(await)而不会阻塞。我不会再深入了解这个功能的细节,因为它主要是为了在幕后实现协同程序,但你不应该真的需要自己直接使用它。

    如果你有兴趣了解更多内容,可以从PEP 342/正式引入协同程序开始。 Brett Cannon的Python中异步等待(Async-Await)是如何工作的也是一个很好的读物,asyncio上的PYMOTW文章也是如此。还有David Beazley的[关于协程和并发的有趣课程] 深入探讨了协同程序运行的机制。

    让我们尝试将上述所有文章压缩成几句话:

    这些协同程序实际上是通过一种非常规的机制运行的。它们的结果是在调用其.send()方法时抛出异常对象的属性。所有这些都有一些不可靠的细节,但是它可能不会帮助您在实践中使用这部分语言,所以现在让我们继续。

    为了联系在一起,以下是关于协同作为生成器这个主题的一些关键点:

    • 协同程序是利用生成器方法的特性的再利用生成器
    • 旧式基于生成器的协同程序使用yield from来等待协程结果。原生协同程序中的现代Python语法只是将yield from等价替换为await作为等待协程结果的方法。await类似于yield,这样想通常是有帮助的。
    • await的使用是标志着断点的信号。它允许协程暂时暂停执行并允许程序稍后返回它。

    其他特点: async for and Async Generators + Comprehensions

    与纯async/await一起,Python还允许通过async for异步迭代异步迭代器。异步迭代器的目的是让它能够在迭代时在每个阶段调用异步代码。

    这个概念的自然延伸是异步发生器。回想一下,你可以在原生协程中使用await,return或yield。在Python 3.6中可以使用协程中的yield(通过PEP 525),它引入了异步生成器,目的是允许await和yield在同一个协程函数体中使用:

    >>> async def mygen(u: int = 10):
    ...     """Yield powers of 2."""
    ...     i = 0
    ...     while i < u:
    ...         yield 2 ** i
    ...         i += 1
    ...         await asyncio.sleep(0.1)
    

    最后但同样重要的是,Python通过async for来实现异步理解。就像它的同步表兄弟一样,这主要是语法糖:

    >>> async def main():
    ...     # This does *not* introduce concurrent execution
    ...     # It is meant to show syntax only
    ...     g = [i async for i in mygen()]
    ...     f = [j async for j in mygen() if not (j // 3 % 5)]
    ...     return g, f
    ...
    >>> g, f = asyncio.run(main())
    >>> g
    [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
    >>> f
    [1, 2, 16, 32, 256, 512]
    

    这是一个关键的区别:异步生成器和理解都不会使迭代并发。它们所做的就是提供同步对等程序的外观和感觉,但是有能力让循环放弃对事件循环的控制,让其他协同程序运行。

    换句话说,异步迭代器和异步生成器不是为了在序列或迭代器上同时映射某些函数而设计的。它们仅仅是为了让封闭的协程允许其他任务轮流使用。async for和async with语句仅在使用纯for或with会“破坏”协程中await的性质的情况下才需要。异步性和并发之间的区别是一个需要掌握的关键因素。

    事件循环和asyncio.run()

    您可以将事件循环视为一段时间的while True循环,它监视协同程序,获取有关闲置内容的反馈,并查找可在此期间执行的内容。当协同程序等待的任何内容变得可用时,它能够唤醒空闲协程。

    到目前为止,事件循环的整个管理已由一个函数调用隐式处理:

    asyncio.run(main())  # Python 3.7+
    

    Python 3.7中引入的asyncio.run()负责获取事件循环,运行任务直到它们被标记为完成,然后关闭事件循环。

    使用get_event_loop()管理asyncio事件循环有一种更加冗长的方式。典型的模式如下所示:

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()
    

    你可能会在较旧的示例中看到loop.get_event_loop(),但除非你需要对事件循环管理控制进行特别的微调,否则asyncio.run()应该足以满足大多数程序的需要。

    如果确实需要在Python程序中与事件循环交互,loop是一个老式的Python对象,它支持使用loop.is_running()和loop.is_closed()进行内省/introspection 。如果需要获得更精细的控制,可以对其进行操作,例如通过将循环作为参数传递来调度回调

    更重要的是要深入了解事件循环的机制。关于事件循环,这里有几点值得强调。

    1:协同程序在与事件循环绑定之前不会自行做很多事情。

    你之前在生成器的解释中看到了这一点,但值得重申。如果您有一个主协程在等待其他协程,那么单独调用它几乎没有什么效果:

    >>> import asyncio
    
    >>> async def main():
    ...     print("Hello ...")
    ...     await asyncio.sleep(1)
    ...     print("World!")
    
    >>> routine = main()
    >>> routine
    <coroutine object main at 0x1027a6150>
    

    请记住使用asyncio.run()通过调度main()协程(将来的对象)来实际强制执行,以便在事件循环上执行:

    >>> asyncio.run(routine)
    Hello ...
    World!
    

    (其他协同程序可以通过await执行。通常在asyncio.run()中封装main(),然后从那里调用带有await的链式协程。)

    2:默认情况下,异步IO事件循环在单个线程和单个CPU内核上运行。通常,在一个CPU内核中运行一个单线程事件循环是绰绰有余的。还可以跨多个核心运行事件循环。请查看John Reese谈话获取更多内容,顺便提个醒,你的笔记本电脑可能会自发燃烧。

    3:事件循环是可插入的。也就是说,如果你真的需要,你可以编写自己的事件循环实现,并让它以相同的方式运行任务。这在uvloop包中得到了很好的演示,这是Cython中事件循环的一个实现。

    这就是"可插入事件循环"这个术语的含义:你可以使用事件循环的任何工作实现,与协同程序本身的结构无关。asyncio包本身附带两个不同的事件循环实现,默认情况下基于选择器模块。(第二个实现仅适用于Windows。)

    一个完整的程序:异步请求

    你已经走了这么远,现在是时候享受快乐和无痛的部分了。在本节中,您将使用aiohttp(一种速度极快的异步http 客户端/服务端 框架)构建一个抓取网页的网址收集器areq.py。(我们只需要客户端部分。)这种工具可以用来映射一组站点之间的连接,这些链接形成一个有向图

    :您可能想知道为什么Python的requests包与异步IO不兼容。requests构建在urllib3之上,而urllib3又使用Python的http和socket模块。默认情况下,socket操作是阻塞的。这意味着Python不会想await requests.get(url)这样,因为.get()不是awaitable的。相比之下,aiohttp中几乎所有东西都是一个awaitable的协程,比如,session.request() response.text(). 它是一个很棒的库,但是在异步代码中使用requests是有害的。

    高层程序结构如下:

    1. 从本地文件url .txt中读取url序列。
    2. 发送对URL的GET请求并解码生成的内容。 如果这失败了,在那里停下来找一个网址。
    3. 在响应的HTML中搜索href标记内的URL
    4. 将结果写入foundurls.txt。
    5. 尽可能异步和并发地执行上述所有操作。(对请求使用aiohttp,对文件附件使用aiofiles。这是IO的两个主要示例,非常适合异步IO模型。)

    下是urls.txt的内容。 它并不庞大,并且主要包含高流量的网站:

    $ cat urls.txt
    https://regex101.com/
    https://docs.python.org/3/this-url-will-404.html
    https://www.nytimes.com/guides/
    https://www.mediamatters.org/
    https://1.1.1.1/
    https://www.politico.com/tipsheets/morning-money
    https://www.bloomberg.com/markets/economics
    https://www.ietf.org/rfc/rfc2616.txt
    

    列表中的第二个网址应该返回一个404响应,你需要优雅地处理这个响应。如果你正在运行此程序的扩展版本,你可能需要处理比这更多的问题,例如服务器断开连接和无限重定向。

    求本身应该使用单个会话进行,以充分利用会话的内部连接池。

    让我们来看看完整的程序。之后,我们将一步一步地介绍这些内容:

    #!/usr/bin/env python3
    # areq.py
    
    """Asynchronously get links embedded in multiple pages' HMTL."""
    
    import asyncio
    import logging
    import re
    import sys
    from typing import IO
    import urllib.error
    import urllib.parse
    
    import aiofiles
    import aiohttp
    from aiohttp import ClientSession
    
    logging.basicConfig(
        format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
        level=logging.DEBUG,
        datefmt="%H:%M:%S",
        stream=sys.stderr,
    )
    logger = logging.getLogger("areq")
    logging.getLogger("chardet.charsetprober").disabled = True
    
    HREF_RE = re.compile(r'href="(.*?)"')
    
    async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
        """GET request wrapper to fetch page HTML.
    
        kwargs are passed to `session.request()`.
        """
    
        resp = await session.request(method="GET", url=url, **kwargs)
        resp.raise_for_status()
        logger.info("Got response [%s] for URL: %s", resp.status, url)
        html = await resp.text()
        return html
    
    async def parse(url: str, session: ClientSession, **kwargs) -> set:
        """Find HREFs in the HTML of `url`."""
        found = set()
        try:
            html = await fetch_html(url=url, session=session, **kwargs)
        except (
            aiohttp.ClientError,
            aiohttp.http_exceptions.HttpProcessingError,
        ) as e:
            logger.error(
                "aiohttp exception for %s [%s]: %s",
                url,
                getattr(e, "status", None),
                getattr(e, "message", None),
            )
            return found
        except Exception as e:
            logger.exception(
                "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
            )
            return found
        else:
            for link in HREF_RE.findall(html):
                try:
                    abslink = urllib.parse.urljoin(url, link)
                except (urllib.error.URLError, ValueError):
                    logger.exception("Error parsing URL: %s", link)
                    pass
                else:
                    found.add(abslink)
            logger.info("Found %d links for %s", len(found), url)
            return found
    
    async def write_one(file: IO, url: str, **kwargs) -> None:
        """Write the found HREFs from `url` to `file`."""
        res = await parse(url=url, **kwargs)
        if not res:
            return None
        async with aiofiles.open(file, "a") as f:
            for p in res:
                await f.write(f"{url}	{p}
    ")
            logger.info("Wrote results for source URL: %s", url)
    
    async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
        """Crawl & write concurrently to `file` for multiple `urls`."""
        async with ClientSession() as session:
            tasks = []
            for url in urls:
                tasks.append(
                    write_one(file=file, url=url, session=session, **kwargs)
                )
            await asyncio.gather(*tasks)
    
    if __name__ == "__main__":
        import pathlib
        import sys
    
        assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
        here = pathlib.Path(__file__).parent
    
        with open(here.joinpath("urls.txt")) as infile:
            urls = set(map(str.strip, infile))
    
        outpath = here.joinpath("foundurls.txt")
        with open(outpath, "w") as outfile:
            outfile.write("source_url	parsed_url
    ")
    
        asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))
    

    这个脚本比我们最初的玩具程序要长,所以让我们把它分解一下。

    常量HREF RE是一个正则表达式,用于提取我们最终要搜索的HTML中的HREF标记:

    >>> HREF_RE.search('Go to <a href="https://realpython.com/">Real Python</a>')
    <re.Match object; span=(15, 45), match='href="https://realpython.com/"'>
    

    协程 fetch html()是一个GET请求的包装器,用于发出请求并解码结果页面html。它发出请求,等待响应,并在非200状态的情况下立即提出:

    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()
    

    如果状态正常,则fetch_html()返回页面HTML(str)。值得注意的是,这个函数中没有执行异常处理。逻辑是将该异常传播给调用者并让它在那里处理:

    html = await resp.text()
    

    我们等待session.request()resp.text(),因为它们是awaitable的协程。否则,请求/响应周期将是应用程序的长尾、占用时间的部分,但是对于异步输入输出,fetch_html()允许事件循环处理其他可用的作业,例如解析和写入已经获取的URLs。

    协程链中的下一个是parse(),它等待fetch html()获取给定的URL,然后从该页面的s html中提取所有的href标记,确保每个标记都是有效的,并将其格式化为绝对路径。

    诚然,parse()的第二部分是阻塞的,但它包括快速正则表达式匹配,并确保发现的链接成为绝对路径。

    在这种特殊情况下,这个同步代码应该是快速和不明显的。但是请记住,在给定的协程内的任何一行都会阻塞其他协程,除非该行使用yield、await或return。如果解析是一个更密集的过程,您可能需要考虑使用executor()中的loop.run_in_executor()在自己的进程中运行这部分。

    接下来,协程 write()接受一个文件对象和一个URL,并等待parse()返回一组已解析的URL,通过使用aiofiles(一个用于异步文件IO的包)将每个URL及其源URL异步地写入文件。

    最后,bulk_crawl_and_write()作为脚本的协程链的主要入口点。 它使用单个会话,并为最终从urls.txt读取的每个URL创建任务。

    这里还有几点值得一提:

    • 默认的客户机会话有一个最多有100个打开连接的适配器。要更改这一点,请将asyncio.connector.TCPConnector的实例传递给ClientSession。您也可以按主机指定限制。
    • 可以为整个会话和单个请求指定最大超时
    • 此脚本还使用async with,它与异步上下文管理器一起使用。 我没有专门讨论这个概念,因为从同步到异步上下文管理器的转换相当简单。后者必须定义.__ aenter __().__ aexit __()而不是.__ exit __().__enter__()。正如您所料,async with只能在使用async def声明的协程函数中使用。

    如果您想进一步了解,GitHub上本教程附带的文件有详细的注释。

    下面是执行的全部荣耀,因为areq.py可以在一秒钟内获取、解析和保存9个url的结果:

    $ python3 areq.py
    21:33:22 DEBUG:asyncio: Using selector: KqueueSelector
    21:33:22 INFO:areq: Got response [200] for URL: https://www.mediamatters.org/
    21:33:22 INFO:areq: Found 115 links for https://www.mediamatters.org/
    21:33:22 INFO:areq: Got response [200] for URL: https://www.nytimes.com/guides/
    21:33:22 INFO:areq: Got response [200] for URL: https://www.politico.com/tipsheets/morning-money
    21:33:22 INFO:areq: Got response [200] for URL: https://www.ietf.org/rfc/rfc2616.txt
    21:33:22 ERROR:areq: aiohttp exception for https://docs.python.org/3/this-url-will-404.html [404]: Not Found
    21:33:22 INFO:areq: Found 120 links for https://www.nytimes.com/guides/
    21:33:22 INFO:areq: Found 143 links for https://www.politico.com/tipsheets/morning-money
    21:33:22 INFO:areq: Wrote results for source URL: https://www.mediamatters.org/
    21:33:22 INFO:areq: Found 0 links for https://www.ietf.org/rfc/rfc2616.txt
    21:33:22 INFO:areq: Got response [200] for URL: https://1.1.1.1/
    21:33:22 INFO:areq: Wrote results for source URL: https://www.nytimes.com/guides/
    21:33:22 INFO:areq: Wrote results for source URL: https://www.politico.com/tipsheets/morning-money
    21:33:22 INFO:areq: Got response [200] for URL: https://www.bloomberg.com/markets/economics
    21:33:22 INFO:areq: Found 3 links for https://www.bloomberg.com/markets/economics
    21:33:22 INFO:areq: Wrote results for source URL: https://www.bloomberg.com/markets/economics
    21:33:23 INFO:areq: Found 36 links for https://1.1.1.1/
    21:33:23 INFO:areq: Got response [200] for URL: https://regex101.com/
    21:33:23 INFO:areq: Found 23 links for https://regex101.com/
    21:33:23 INFO:areq: Wrote results for source URL: https://regex101.com/
    21:33:23 INFO:areq: Wrote results for source URL: https://1.1.1.1/
    

    还不算太寒酸! 作为完整性检查,你可以检查输出的行数。 在我做这个实验的时候,它是626,但请记住,这可能会发生变动:

    $ wc -l foundurls.txt
         626 foundurls.txt
    
    $ head -n 3 foundurls.txt
    source_url  parsed_url
    https://www.bloomberg.com/markets/economics https://www.bloomberg.com/feedback
    https://www.bloomberg.com/markets/economics https://www.bloomberg.com/notices/tos
    

    下一步:如果你想增加难度,可以让这个网络爬虫进行递归。您可以使用aio-redis跟踪树中已爬网的URL,以避免请求它们两次,并使用Python的networkx库进行链接。 记住要友好一点。将1000个并发请求发送到一个小的、毫无防备的网站是非常糟糕的。有一些方法可以限制您在一个批处理中进行的并发请求数,例如使用asyncio的sempahore对象或使用类似这样的模式。

    上下文中的异步IO

    既然您已经看到了相当多的代码,让我们回过头来考虑一下什么时候异步IO是一个理想的选择,以及如何进行比较来得出这个结论,或者选择其他不同的并发模型。

    何时以及为何异步IO是正确的选择?

    本教程不适用于异步IO与线程、多处理的扩展论述。然而,了解异步IO何时可能是三者中最好的候选是很有用的。

    关于异步IO与多处理之间的斗争实际上根本不是一场战争。事实上,它们可以一起使用。如果你有多个相当统一的CPU密集型任务(一个很好的例子是scikit-learn或keras等库中的网格搜索),多进程应该是一个明显的选择。

    如果所有函数都使用阻塞调用,那么将async放在每个函数之前不是一个好主意。(这实际上会降低你的代码速度。)是正如前面提到的,异步IO和多处理可以在一些地方和谐共存

    线程的伸缩性也比异步IO要差,因为线程是具有有限可用性的系统资源.在许多机器上创建数千个线程都会失败,我不建议您首先尝试它。创建数千个异步IO任务是完全可行的。

    当您有多个IO绑定任务时,异步IO会闪烁,否则任务将通过阻止IO密集等待时间来控制,例如:

    • 网络IO,无论您的程序是服务器端还是客户端
    • 无服务器设计,例如点对点,多用户网络,如组聊天室
    • 读/写操作,在这种操作中,您想要模仿“发射后不管”的风格,但不必担心锁定正在读写的内容

    不使用await的最大原因是await只支持定义特定方法集的特定对象集。如果要对某个DBMS执行异步读取操作,则不仅需要查找该DBMS的Python包,这个包还必须支持python的async / await语法。包含同步调用的协程会阻止其他协程和任务运行。关使用async / await的库的列表,请参阅本教程末尾的列表

    Async IO It Is, but Which One?

    本教程重点介绍异步IO,async / await语法,以及使用asyncio进行事件循环管理和指定任务。

    asyncio当然不是唯一的异步IO库。 Nathaniel J. Smith的观察说了很多:

    [在]几年后,asyncio可能会发现自己沦落为精明的开发人员避免使用的stdlib库之一,比如urllib2。……实际上,我所说的是,asyncio是其自身成功的牺牲品:在设计时,它采用了可能的最好方法; 但从那以后,受asyncio启发的工作 - 比如async / await的加入 - 已经改变了局面,让我们可以做得更好,现在asyncio受到其早期承诺的束缚。via:(来源)【https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/】

    尽管使用不同的api和方法,大名鼎鼎的curiotrio能做asyncio做的事情。就个人而言,我认为如果你正在构建一个中等规模,简单的程序,只需使用asyncio就足够了,而且易于理解,可以避免在Python的标准库之外添加另一个大的依赖项。

    但无论如何,看看curio和trio,你可能会发现他们用一种更直观的方式完成了同样的事情。此处介绍的许多与包不相关的概念也应该渗透到备用异步IO包中。

    其他零碎

    在接下来的几节中,您将看到asyncio和async/wait的一些杂项部分,这部分到目前为止还没有完全融入教程,但是对于构建和理解一个完整的程序仍然很重要。

    其他顶级asyncio 函数

    除了asyncio.run()之外,您还看到了一些其他的包级函数,如asyncio.create_task()和asyncio.gather()

    您可以使用create task()来调度协调程序对象的执行,后面跟着asyncio.run()

       >>> import asyncio
    
    >>> async def coro(seq) -> list:
    ...     """'IO' wait time is proportional to the max element."""
    ...     await asyncio.sleep(max(seq))
    ...     return list(reversed(seq))
    ...
    >>> async def main():
    ...     # This is a bit redundant in the case of one task
    ...     # We could use `await coro([3, 2, 1])` on its own
    ...     t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
    ...     await t
    ...     print(f't: type {type(t)}')
    ...     print(f't done: {t.done()}')
    ...
    >>> t = asyncio.run(main())
    t: type <class '_asyncio.Task'>
    t done: True
    

    这种模式有一个微妙之处:如果你没有在main()中await t,它可能在main()本身发出信号表明它已完成之前完成。因为在没有await t 的情况下asynio.run(main())调用loop.run_until_complete(main()),事件循环只关心main()是否完成了,而不是main()中创建的任务是否已经完成。没有await t,循环的其他事件可能在它们完成之前会被取消。如果需要获取当前待处理任务的列表,可以使用asyncio.Task.all_tasks()。

    注意:asyncio.create_task()是在Python 3.7中引入的。在Python 3.6或更低版本中,使用asyncio.ensure_future()代替create_task()。

    另外,还有asyncio.gather()。虽然它没有做任何非常特殊的事情,但是gather()的目的是将一组协程(future)整齐地放到一个单一的future。因此,它返回一个单独的future对象,如果await asyncio.gather()并指定多个任务或协同程序,则表示您正在等待这些对象全部完成。(这与前面示例中的queue.join()有些相似。)gather()的结果将是跨输入的结果列表:

    >>> import time
    >>> async def main():
    ...     t = asyncio.create_task(coro([3, 2, 1]))
    ...     t2 = asyncio.create_task(coro([10, 5, 0]))  # Python 3.7+
    ...     print('Start:', time.strftime('%X'))
    ...     a = await asyncio.gather(t, t2)
    ...     print('End:', time.strftime('%X'))  # Should be 10 seconds
    ...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
    ...     return a
    ...
    >>> a = asyncio.run(main())
    Start: 16:20:11
    End: 16:20:21
    Both tasks done: True
    >>> a
    [[1, 2, 3], [0, 5, 10]]
    

    你可能已经注意到gather()等待您传递它的Futures或协程的整个结果集。或者,您可以按完成顺序循环遍历asyncio.as_completed()以完成任务。该函数返回一个迭代器,在完成任务时生成任务。下面coro([3,2,1])的结果将在coro([10,5,0])完成之前可用,而gather()的情况并非如此:

    >>> async def main():
    ...     t = asyncio.create_task(coro([3, 2, 1]))
    ...     t2 = asyncio.create_task(coro([10, 5, 0]))
    ...     print('Start:', time.strftime('%X'))
    ...     for res in asyncio.as_completed((t, t2)):
    ...         compl = await res
    ...         print(f'res: {compl} completed at {time.strftime("%X")}')
    ...     print('End:', time.strftime('%X'))
    ...     print(f'Both tasks done: {all((t.done(), t2.done()))}')
    ...
    >>> a = asyncio.run(main())
    Start: 09:49:07
    res: [1, 2, 3] completed at 09:49:10
    res: [0, 5, 10] completed at 09:49:17
    End: 09:49:17
    Both tasks done: True
    

    最后,你可能还可以看到asyncio.ensure_future()。你应该很少需要它,因为它是一个较低级别的管道API,并且很大程度上被后来引入的create_task()取代。

    await的优先级

    虽然它们的行为有些相似,但await关键字的优先级明显高于yield。这意味着,由于它的绑定更紧密,在很多情况下,您需要在yield from语句中使用括号,而在类似的await语句中则不需要。有关更多信息,请参见PEP 492中的await表达式示例

    总结

    你现在已经准备好使用async / await和它构建的库了。 以下是你已经学到的的内容概述:

    • 异步IO作为一种与语言无关的模型,通过让协程彼此间进行间接通信来实现并发
    • Python中用于标记和定义协程的新关键字async、await的一些细节。
    • 提供用于运行和管理协程的API的Python包asyncio

    附加资源

    Python版本细节

    Python中的异步IO发展迅速,很难跟踪什么时候发生了什么。下面列出了与asyncio相关的Python小版本更改和介绍:

    • 3.3: yield from表达式允许生成器委派
    • 3.4:asyncio以临时API状态引入Python标准库
    • 3.5:async和await成为Python语法的一部分,用于表示和等待协程。它们还没有成为保留关键字(您仍然可以定义名为async和await的函数或变量)。
    • 3.6:引入异步生成器和异步理解/链、推导。asyncio的API被声明为稳定的,而不是临时的。
    • 3.7:async和await成为保留关键字(它们不能用作标识符。)。它们用于替换asyncio.coroutine()装饰器。asyncio.run()被引入asyncio包,其中包括许多其他功能

    如果您想要安全(并且能够使用asyncio.run()),请使用Python 3.7或更高版本来获取完整的功能集。

    相关文章

    以下是其他资源的精选列表:

    Python文档的 What’s New 部分更详细地解释了语言变化背后的动机:

    来自David Beazley的:

    YouTube 视频:

    相关PEPs

    PEP 创建时间
    PEP 342 – 通过增强型生成器的协程 2005-05
    PEP 380 – 委托给子生成器的语法 2009-02
    PEP 3153 – 异步IO支持 2011-05
    PEP 3156 – 异步IO支持重新启动:“asyncio”模块 2012-12
    PEP 492 – async和await语法的协程 2015-04
    PEP 525 – 异步生成器 2016-07
    PEP 530 – Asynchronous Comprehensions 2016-09

    使用async/await的库

    来自 aio-libs:

    • aiohttp: 异步HTTP客户端/服务器框架
    • aioredis: 异步IO Redis支持
    • aiopg: 异步IO PostgreSQL 支持
    • aiomcache: 异步IO memcached 客户端
    • aiokafka: 异步IO Kafka 客户端
    • aiozmq: 异步IO ZeroMQ 支持
    • aiojobs:用于管理后台任务的作业调度程序
    • async_lru: 用于异步IO的简单LRU缓存

    来自 magicstack:

    • uvloop:超快的异步IO事件循环
    • asyncpg: (也非常快)异步IO PostgreSQL支持

    来自其他:

    • trio: 更友好的“asyncio”,旨在展示一个更加简单的设计
    • aiofiles: 异步 文件 IO
    • asks: 异步类requests的http 库
    • asyncio-redis: 异步IO Redis 支持
    • aioprocessing: 将multiprocessing模块与asyncio集成在一起
    • umongo: 异步IO MongoDB 客户端
    • unsync: Unsynchronize asyncio
    • aiostream:类似'itertools',但异步

    我的感想:

    其实读完这篇文章,我相信有很多人仍旧会有困惑——异步IO底层到底是怎么实现的?早些时候我也很困惑,要说多线程多进程我们很好理解,因为我们知道常用的现代计算机是根据时间片分时运行程序的。到了异步IO或者协程这里竟然会出现一段没有CPU参与的时间。在我学习Javascript/nodejs的时候就更困惑了,web-base的javascript和backend nodejs都是单线程设计的,它的定时器操作怎么实现的?它的界面异步操作怎么实现的?后来读了《UNIX环境高级编程》才有种“恍然大悟”的感觉。在学习编程语言的时候,往往认为语言本身是图灵完备的,编程语言设定的规则就是整个世界。但实际上,编程语言的图灵完备仅体现在逻辑和运算上,其他的一些设施底层不是语言本身就能够完全解释的。我们,至少是我自己,在学习一个语言工具的时候往往忽略了一个早就知道的现实——现代常规的编程,都是面向操作系统的编程!无论是多线程、多进程还是异步IO本身都是操作系统提供的功能。多余web-base的javascript更是面向浏览器编程。浏览器不提供异步IO相关的功能,Web-base 的javascript本身是没办法实现的,操作系统不支持异步IO,什么语言也不行~golang的go程也不过是从系统手中接管了生成线程之后的再分配管理。正像Linux/Unix编程标准是两个的合体——ANSI C + POSIX,我们学习的语言正对应ANSI C,但多线程、多进程、信号这些东西本身不是语言规范里面的,他们是POSIX里的,是操作系统的规范,是操作系统提供的!再进一步,为什么操作系统能实现?因为硬件支持这样的实现!

  • 相关阅读:
    Mybatis(4) 映射文件-参数处理
    Mybatis(3) 映射文件-增删改查
    Mabatis(2) 全局配置文件
    Mybatis(1) 创建Mybatis HelloWorld
    过滤器和拦截器之间的区别
    Redis(3) 配置文件 redis.conf
    Redis(2) 数据类型
    Redis(1) 初识Redis
    ActiveMQ(4) ActiveMQ JDBC 持久化 Mysql 数据库
    8.字典
  • 原文地址:https://www.cnblogs.com/taceywong/p/11224731.html
Copyright © 2011-2022 走看看