zoukankan      html  css  js  c++  java
  • asyncio:python3未来并发编程主流、充满野心的模块

    介绍

    asyncio是Python在3.5中正式引入的标准库,这是Python未来的并发编程的主流,非常重要的一个模块。有一个web框架叫sanic,就是基于asyncio,语法和flask类似,使用sanic可以达到匹配go语言的并发量,但无奈第三方组件太少。

    asyncio模块提供了使用协程构建并发应用的工具。threading模块通过应用线程实现并发,multiprocessing使用系统进程实现并发,asyncio使用一种单线程、单进程模式实现并发,应用的各个部分会彼此合作,在最优的时刻显式的切换任务。大多数情况下,会在程序阻塞等待读写数据时发生这种上下文切换,不过asyncio也支持调度代码在将来的某个特定时间运行,从而支持一个协程等待另一个协程完成,以处理系统信号和识别其他一些事件(这些事件可能导致应用改变其工作内容)

    asyncio中,有几个非常重要的概念。

    • coroutine 协程协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
    • future 未来对象:在asyncio中,如何才能得知异步调用的结果呢?先设计一个对象,异步调用执行完的时候,就把结果放在它里面。这种对象称之为未来对象。未来对象有一个result属性,用于存放未来的执行结果。还有个set_result()方法,是用于设置result的,future可以看作为下面的task的容器。
    • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
    • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
    • async/await 关键字python3.5 开始引入的用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

    特点

    使用其他并发模型的大多数程序都采用线性方式编写,而且依赖于语言运行时系统或操作系统的底层线程或进程管理来适当地改变上下文。基于asyncio的应用要求应用代码显式地处理上下文切换,要正确地使用相关技术,这取决于是否能正确理解一些相关联的概念。

    asyncio提供的框架以一个事件循环(event loop)为中心,这是一个首类对象,负责高效地处理I/O事件、系统事件、和应用上下文切换。目前已经提供了多个循环实现来高效地利用操作系统的功能。尽管通常会自动选择一个合理的默认实现,但也完全可以在应用中选择某个特定的事件循环实现。

    与事件循环交互的应用要显式地注册将运行的代码,让事件循环在资源可用时向应用代码发出必要的调用。

    例如:一个网络服务器打开套接字,然后注册为当这些套接字上出现输入事件时服务器要得到的通知。

    事件循环在建立一个新的进入链接或者在数据可读取时都会提醒服务器代码。当前上下文中没有更多工作可做时,应用代码要再次短时间地交出控制权。

    例如:如果一个套接字没有更多的数据可以接收,那么服务器会把控制权交给事件循环

    所以,就是把事件注册到事件循环中,不断地循环这些事件,可以处理了那么就去处理,如果卡住了,那么把控制权交给事件循环,继续执行其他可执行的任务。

    像传统的twisted、gevent、以及tornado,都是采用了事件循环的方式,这种模式只适用于高I/O,低CPU的场景,一旦出现了耗时的复杂运算,那么所有任务都会被卡住。

    将控制权交给事件循环的机制依赖于协程(coroutine),这是一些特殊的函数,可以将控制返回给调用者而不丢失其状态。

    协程与生成器非常类似,实际上,在python3.5版本之前还未对协程提供原生支持时,可以用生成器来实现协程。asyncio还为协议(protocol)和传输(transport)提供了一个基于类的抽象层,可以使用回调编写代码而不是直接编写协程。在基于类的模型和协程模型时,可以通过重新进入事件循环显式地改变上下文,以取代python多线程实现中隐式的上下文改变

    创建一个协程并执行

    import asyncio
    
    '''
    协程是一个专门设计用来实现并发操作的语言构造。
    在早期版本,是使用yield来模拟协程,但它本质上是一个生成器。
    但是从python3.5开始,python已经支持原生协程。
    调用协程会创建一个协程对象,协程可以使用await关键字(并提供另一个协程)暂停执行。
    暂停时,这个协程的状态会保留,使得下一次被唤醒时可以从暂停的地方恢复执行
    '''
    
    
    # 使用async def可以直接定义一个协程
    async def coroutine():
        print("in coroutine")
    
    
    # 创建事件循环
    loop = asyncio.get_event_loop()
    try:
        print("start coroutine")
        # 协程是无法直接运行的,必须要扔到事件循环里,让事件循环驱动运行
        coro = coroutine()
        print("entering event loop")
        # 必须扔到事件循环里,这个方法的含义从名字也能看出来,直到协程运行完成
        loop.run_until_complete(coro)
    finally:
        print("closing event loop")
        # 关闭事件循环
        loop.close()
    '''
    start coroutine
    entering event loop
    in coroutine
    closing event loop
    '''
    
    # 第一步是得到事件循环的引用。
    # 可以使用默认地循环类型,也可以实例化一个特定的循环类。
    # run_until_complete方法启动协程,协程退出时这个方法会停止循环
    

    值得一提的是,从python3.7开始,async和await已经是关键字了,我们之前说的关键字其实是保留关键字,意思是你可以使用async和await作为变量名,但是在python3.7之后,就不可以了。另外在python3.7中还提供了另一种运行协程的方法

    import asyncio
    
    
    async def coroutine():
        print("in coroutine")
    
    
    coro = coroutine()
    
    # 我们可以直接调用asyncio.run(coroutine)来启动一个协程
    # 使用这种方法运行必须确保python的版本不低于python3.7
    asyncio.run(coro)
    """
    in coroutine
    """
    

    如果协程有返回值呢?

    import asyncio
    
    '''
    我们也可以获取协程的返回值
    '''
    
    
    async def coroutine():
        print("in coroutine")
        return "result"
    
    
    loop = asyncio.get_event_loop()
    try:
        coro = coroutine()
        result = loop.run_until_complete(coro)
        print(result)
    finally:
        loop.close()
    '''
    in coroutine
    result
    '''
    # 在这里,run_until_complete还会返回它等待的协程的结果
    

    对于asyncio.run来说,也是一样的

    import asyncio
    
    '''
    我们也可以获取协程的返回值
    '''
    
    
    async def coroutine():
        print("in coroutine")
        return "result"
    
    
    coro = coroutine()
    result = asyncio.run(coro)
    print(result)
    """
    in coroutine
    result
    """
    

    多个协程合作

    如果有多个协程呢?

    import asyncio
    
    '''
    一个协程还可以驱动另一个协程并等待结果,从而可以更容易地将一个任务分解为可重用的部分。
    '''
    
    
    async def worker():
        print("worker....")
        # 使用await方法会驱动协程consumer执行,并得到其返回值
        # 这里类似于函数调用一样,但是呢,协程需要加上一个await
        res = await consumer()
        print(res)
    
    
    async def consumer():
        return "i am consumer"
    
    
    asyncio.run(worker())
    """
    worker....
    i am consumer
    """
    # 在这里,使用await关键字,而不是向循环中增加新的协程。因为控制流已经在循环管理的一个协程中,所以没必要告诉事件循环来管理这些协程。
    # 另外,协程可以并发运行,但前提是多个协程。这个协程卡住了,可以切换到另一个协程。但是就卡住的协程本身来说,该卡多长时间还是多长时间,不可能说跳过卡住的部分执行下面的代码。
    

    另外我们还可以通过装饰器来模拟协程

    import asyncio
    
    '''
    协程函数时asyncio设计中的关键部分。
    它们提供了一个语言构造,可以停止程序某一部分的执行,保留这个调用的状态,并在以后重新进入这个状态,这些动作都是并发框架很重要的功能。
    
    python3.5中引入了一些新的语言特性,可以使用async def以原生方式定义这些协程,以及使用await交出控制,asyncio的例子应用了这些新特性。
    但是早期版本,可以使用asyncio.coroutine装饰器将函数装饰成一个协程并使用yield from来达到同样的效果。
    '''
    
    
    @asyncio.coroutine
    def worker():
        print("worker....")
        res = yield from consumer()
        print(res)
    
    
    @asyncio.coroutine
    def consumer():
        return "i am consumer"
    
    
    asyncio.run(worker())
    """
    worker....
    i am consumer
    """
    
    # 我们看到使用生成器依旧可以达到这样的效果,然而尽管使用生成器可以达到同样的效果,但还是推荐使用async和await
    '''
    生成器既可以做生成器,又可以包装为协程,那么它到底是协程还是生成器呢?这会使得代码出现混乱
    生成器既然叫生成器,那么就应该做自己
    基于async的原生协程比使用yield装饰器的协程要快,大概快10-20%
    '''
    # 并且在python3.8中,已经警告了,不建议使用这种方法,定义一个协程应该使用async def
    

    调用常规函数

    call_soon

    可以使用这个函数给协程绑定一个回调,从名字也能看出来是立即执行,只不过是遇到阻塞立即执行。

    import asyncio
    from functools import partial
    
    '''
    除了管理协程和I/P回调,asyncio事件循环还可以根据循环中保存的一个定时器值来调度常规函数调用。
    '''
    # 如果回调的时间不重要,那么可以使用call_soon调度下一次循环迭代的调用
    
    
    def callback(*args, **kwargs):
        print("callback:", args, kwargs)
    
    
    async def main(loop):
        print("register callback")
        # 接收一个回调函数,和参数
        loop.call_soon(callback, "mashiro", 16)
        print("********")
        # 另外call_soon不支持使用关键字参数来向回调传递参数
        # 所以如果想使用关键字参数,需要使用偏函数转换一下
        # 其实不仅是这里的call_sonn,以及后面要介绍的call_later和call_at都不支持使用关键字参数来向回调传递参数
        # 因此如果不想使用偏函数来包装的话,就直接使用位置参数就可以了
        wrapped = partial(callback, kwargs={"name": "satori", "age": 16})
        loop.call_soon(wrapped, "mahsiro", 16)
        print("—————————")
    
        await asyncio.sleep(0.6)
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    """
    register callback
    ********
    —————————
    callback: ('mashiro', 16) {}
    callback: ('mahsiro', 16) {'kwargs': {'name': 'satori', 'age': 16}}
    """
    # 另外我们发现我们在调用了call_soon之后没有立刻执行,而是先进性了print
    # 这是因为只有在遇到阻塞才会立刻执行,所以当遇到await asyncio.sleep的时候会去执行
    # 另外这里的阻塞,不能是time.sleep,必须是可以awaitable的阻塞
    

    call_later

    同样是给一个协程绑定一个回调,但是从名字也能看出来这需要指定一个时间,表示多长时间之后调用。

    import asyncio
    
    '''
    要将回调推迟到将来的某个时间调用,可以使用call_later。这个方法的第一个参数是延迟时间(单位为秒),第二个参数是回调。
    '''
    
    
    def callback(cb, n):
        print(f"{cb} {n}")
    
    
    async def main(loop):
        print("register callback")
        loop.call_later(0.2, callback, "call_later", "0.2s")
        loop.call_later(0.1, callback, "call_later", "0.1s")
        loop.call_soon(callback, "call_soon", "None")
        print("-----------")
        await asyncio.sleep(0.6)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        print("entering event loop")
        event_loop.run_until_complete(main(event_loop))
    finally:
        print("closing event loop")
        event_loop.close()
    
    '''
    entering event loop
    register callback
    -----------
    call_soon None
    call_later 0.1s
    call_later 0.2s
    closing event loop
    '''
    # 我们注意一下main里面的第二个print
    # 我们看到无论是call_soon还是call_later都是在第二个print结束之后才调用
    # 说明call_later和call_soon一样,都是在遇到异步io阻塞、比如asyncio.sleep之后才会执行
    
    # 但是一提的是,对于call_later来说,计时是从注册回调的那一刻就已经开始了
    # 比如call_later中注册了5s后调用,但是遇见了asyncio.sleep(4)的时候过去了两秒,那么执行call_later注册的回调只需要再过3s就可以了。
    # 如果回调瞬间执行完毕的话,那么asyncio.sleep了4秒,只需再过1秒,asyncio.sleep也执行完毕
    
    # 但是如果其他条件不变,而asyncio.sleep(2)怎么办?执行call_later注册的回调需要3s,但是asyncio.sleep异步阻塞只有2s
    # 那么不好意思,程序会继续往下走,因为asyncio.sleep结束之后,还需要1s才会执行call_later指定的回调。
    # 所以程序向下执行,直到出现下一个异步io阻塞,如果不是异步io阻塞的话,那么call_later指定的回调也是不会执行的
    # 因此:执行回调,是指在遇见异步io阻塞的时候才会执行
    # 而call_soon则是只要遇见异步io就会执行,即使遇见异步io,call_later已经等待完毕,执行的先后顺序依旧是call_soon先执行
    

    我们来验证一下

    import asyncio
    import time
    
    
    def callback(cb, n):
        print(f"{cb} {n}")
    
    
    async def main(loop):
        print("register callback")
        loop.call_later(0.2, callback, "call_later", "0.2s")
        loop.call_later(0.1, callback, "call_later", "0.1s")
        loop.call_soon(callback, "call_soon", "None")
        # time.sleep不是异步io,它是一个同步io
        time.sleep(1)
        # 当time.sleep(1)之后call_later和call_soon肯定都会执行,因为call_later里面指定的是0.2和0.1,比1小
        await asyncio.sleep(0.6)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    register callback
    call_soon None
    call_later 0.1s
    call_later 0.2s
    '''
    

    再来看看call_later

    import asyncio
    import time
    
    
    def callback(cb, n):
        print(f"{cb} {n}")
    
    
    async def main(loop):
        print("register callback")
        loop.call_later(2, callback, "call_later", "0.2s")
        print("call_later注册完毕")
        # 这里执行完毕,call_later还没有开始
        await asyncio.sleep(1.5)
        # 1.5 + 1肯定比2大,所以time.sleep(1)之后call_later里面的指定的时间肯定已经过了
        time.sleep(1)
        print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
        print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
        print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
        print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
        await asyncio.sleep(0.1)
        print("完了,我上面出现了异步io阻塞,我要比call_later指定的回调后执行了")
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    register callback
    call_later注册完毕
    就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
    就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
    就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
    就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
    call_later 0.2s
    完了,我上面出现了异步io阻塞,我要比call_later指定的回调后执行了
    '''
    
    import asyncio
    
    
    def callback(cb, n):
        print(f"{cb} {n}")
    
    
    async def main(loop):
        print("register callback")
        loop.call_later(2, callback, "call_later", "0.2s")
        print("call_later注册完毕")
        await asyncio.sleep(1)
        print("call_later指定的回调能执行吗")
        print("call_later指定的回调能执行吗")
        print("call_later指定的回调能执行吗")
        print("不能")
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    register callback
    call_later注册完毕
    call_later指定的回调能执行吗
    call_later指定的回调能执行吗
    call_later指定的回调能执行吗
    不能
    '''
    # 我们看到call_later指定的回调没有执行程序就退出了
    # 这是因为main里面的代码全部执行完,之后call_later指定的时间还没有到
    # 所以直接退出了
    
    import asyncio
    import time
    
    
    def callback(cb, n):
        print(f"{cb} {n}")
    
    
    async def main(loop):
        print("register callback")
        loop.call_later(2, callback, "call_later", "0.2s")
        print("call_later注册完毕")
        await asyncio.sleep(1)
        time.sleep(1)
        print("call_later指定的回调能执行吗")
        print("call_later指定的回调能执行吗")
        print("call_later指定的回调能执行吗")
        print("能,因为时间已经到了")
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    register callback
    call_later注册完毕
    call_later指定的回调能执行吗
    call_later指定的回调能执行吗
    call_later指定的回调能执行吗
    能,因为时间已经到了
    call_later 0.2s
    '''
    # 尽管一致没有出现异步io,但是当代码全部执行完之后,call_later指定的时间已经到了
    # 所以会在最后执行它
    

    call_at

    call_at,在指定的时间执行

    import asyncio
    import time
    
    '''
    除了call_soon瞬间执行,和call_later延迟执行之外,还有一个call_at在指定之间内执行。
    实现这个目的的循环依赖于一个单调时钟,而不是墙上的时钟时间,以确保now时间绝对不会逆转。
    要为一个调度回调选择时间,必须使用循环的time方法从这个时钟的内部开始
    '''
    
    
    def callback(cb, loop):
        print(f"callback {cb} invoked at {loop.time()}")
    
    
    async def main(loop):
        now = loop.time()
        print("clock time:", time.time())
        print("loop time:", now)
        print("register callback")
        # 表示在当前时间(time = loop.time())之后的0.2s执行,个人觉得类似于call_later
        loop.call_at(now + 0.2, callback, "call_at", loop)
        time.sleep(1)
        print("是先打印我呢?还是先执行call_at或者call_sonn呢")
        await asyncio.sleep(1)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    clock time: 1573291054.068545
    loop time: 160079.265
    register callback
    是先打印我呢?还是先执行call_at或者call_sonn呢
    callback call_at invoked at 160079.453
    '''
    # 所以这和call_later也是类似的,都是在遇到io阻塞之后才会执行
    

    以上三者的执行顺序

    首先在遇到异步io阻塞的时候,call_soon是立刻执行,call_later和call_at是需要等指定过了才会执行,如果时间没到,那么执行顺序肯定是call_soon最先,这没问题。但是,如果当遇到一个异步io阻塞的时候,call_later和call_at所指定的时间都过了,那么这个三者的执行顺序是怎么样的呢?

    import asyncio
    
    
    def callback(cb):
        print(f"callback {cb}")
    
    
    async def main(loop):
        now = loop.time()
        loop.call_at(now + 0.2, callback, "call_at")
        loop.call_later(0.2, callback, "call_later")
        loop.call_soon(callback, "call_soon")
    
        #await asyncio.sleep(1)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    callback call_soon
    '''
    # 首先我们发现,如果没有异步io阻塞,那么最终只有call_soon会执行
    
    import asyncio
    import time
    
    
    def callback(cb):
        print(f"callback {cb}")
    
    
    async def main(loop):
        now = loop.time()
        loop.call_at(now + 0.3, callback, "call_at")
        loop.call_later(0.2, callback, "call_later")
        loop.call_soon(callback, "call_soon")
        time.sleep(1)
        await asyncio.sleep(1)
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    
    '''
    callback call_soon
    callback call_later
    callback call_at
    '''
    # 遇到异步io,那么call_soon仍然最先执行
    # 至于call_later和call_at,则是两者指定的时间哪个先到,先执行哪个
    

    task与future

    对于协程来说,是没有办法直接放到事件循环里面运行的,需要的是task。而我们刚才之所以直接将协程扔进去,是因为asyncio内部会有检测机制,如果是协程的话,会自动将协程包装成一个task

    import asyncio
    
    
    async def coroutine():
        print(123)
    
    
    loop = asyncio.get_event_loop()
    # 如何创建一个任务呢?
    task = loop.create_task(coroutine())
    loop.run_until_complete(task)
    """
    123
    """
    

    因此一个协程是一个可以原生挂起的函数,而一个task则是对协程的进一步封装,里面包含了任务的各种执行状态。

    future被称之为未来对象,我觉得可以把它看成是task的容器,每一个task运行之后都会返回一个future,这个future可以获取task的执行状态、返回值等等,并且还能够给task设置相应的返回值。所以执行的是任务,future是用来管理和查看的。

    import asyncio
    
    '''
    Future表示还未完成的工作的结果。事件循环可以通过监视一个Future对象的状态来指示它已经完成,从而允许应用的一部分等待另一部分完成一些工作。
    '''
    
    
    # Future的做法类似于协程,所以等待协程所用的技术同样可以用于等待Future。
    
    
    # 接收一个future和result
    def mark_done(future, result):
        print("setting result")
        future.set_result(result)
    
    
    event_loop = asyncio.get_event_loop()
    # 如何创建一个future,使用asyncio.Future
    future = asyncio.Future()
    
    # 为future注册回调,协程、task、future三者是递进关系,都可以设置回调,都可以扔进事件循环
    event_loop.call_later(2, mark_done, future, "我是返回值")
    # future什么时候才能执行完毕呢?当future执行了set_result函数的时候
    event_loop.run_until_complete(future)
    
    print(future.result())
    """
    setting result
    我是返回值
    """
    # 对于一个任务是否完成,就是看起对应的future有没有set_result,如果set_result之后,那么这个任务便执行完毕了
    # 通过future.result()便可以拿到任务的返回值,所以说future是task的容器,当任务执行完毕,future会调用set_result设置返回值
    
    future = asyncio.Future()
    # 设置只能设置一次,设置多次会报错
    future.set_result("xxx")
    # 但是取可以取多次
    print(future.result())  # xxx
    print(future.result())  # xxx
    print(future.result())  # xxx
    print(future.result())  # xxx
    print(future.result())  # xxx
    
    
    # 不过concurrent.futures里面的future可以设置多次,但是以最后一次为准
    from concurrent.futures import Future
    future = Future()
    future.set_result("aaa")
    future.set_result("bbb")
    print(future.result())  # bbb
    print(future.result())  # bbb
    print(future.result())  # bbb
    

    future还可以结合await使用

    import asyncio
    
    '''
    Future还可以结合await关键字使用
    '''
    
    
    def mark_done(future, result):
        print("setting result")
        future.set_result(result)
    
    
    async def main(loop):
        future = asyncio.Future()
        print("scheduling mark_done")
        loop.call_later(2, mark_done, future, "the result")
        # 会等到这个future完成,什么时候完成,当这个future执行set_result的时候,然后await future的值则是future.result()
        # 其实我们await一个协程也是一样,也是当协程对应的任务执行完毕、future将返回值进行set_result的时候
        # 然后我们知道await coroutine的得到的就是当前定义的coroutine的返回值,其实准确来说,应该是当前coroutine对应的future的result()
        # 只不过result()得到的就是set_result设置进去的,而set_result设置进去的正式当前定义的coroutine的返回值。尽管是一样的,但是这个逻辑还是要理清。
        res = await future
        print("res =", res)
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    """
    scheduling mark_done
    setting result
    res = the result
    """
    

    除了做法与协程类似,future还可以绑定回调

    import asyncio
    import functools
    
    '''
    除了做法与协程类似,Future也可以调用回调,回调的顺序按照其注册的顺序调用
    '''
    
    
    def callback(future, n):
        print(f"future result: {future.result()} n:{n}")
    
    
    async def register_callback(future):
        print("register callback on futures")
        # 设置一个回调,同样是只能传递函数名,触发回调的时候,会自动将future本身作为第一个参数传递给回调函数
        # 回调什么时候执行,还是那句话,当future执行set_result的时候执行
        future.add_done_callback(functools.partial(callback, n=1))
        future.add_done_callback(functools.partial(callback, n=2))
    
    
    async def main(future):
        # 等待回调注册完成
        await register_callback(future)
        print("setting result of future")
        future.set_result("the result")
    
    
    event_loop = asyncio.get_event_loop()
    future = asyncio.Future()
    event_loop.run_until_complete(main(future))
    """
    register callback on futures
    setting result of future
    future result: the result n:1
    future result: the result n:2
    """
    

    执行task

    任务是与事件循环交互的主要途径之一,任务可以包装协程,并跟踪协程何时完成。另外future被称之为未来对象,它是可以等待的,并且task的类对象在源码中实际上是future的类对象的一个子类。每个任务都有一个结果,是通过set_result设置的,并且可以通过result()获取这些结果,当然这些上面说过了,这里不再赘述了

    import asyncio
    
    '''
    任务是与事件循环交互的主要途径之一。
    任务可以包装协程,并跟踪协程何时完成。
    由于任务是Future的子类,所以其他协程可以等待任务,而且每个任务可以有一个结果,在它完成时可以获取这些结果
    '''
    
    
    # 启动一个任务,可以使用create_task函数创建一个Task实例。
    # 只要循环还在运行而且协程没有返回,create_task得到的任务便会作为事件循环管理的并发操作的一部分运行
    async def task_func():
        print("in task func")
        return "the result"
    
    
    async def main(loop):
        print("creating task")
        # 除了使用loop.create_task,我们还可以使用asyncio.ensure_future
        # 对于传入一个协程的话,asyncio.ensure_future还是调用了loop.create_task
        task = loop.create_task(task_func())
        print(f"wait for {task}")
        return_value = await task
        print(f"task completed {task}")
        print(f"return value {return_value}")
    
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    """
    creating task
    wait for <Task pending coro=<task_func() running at D:/satori/1.py:12>>
    in task func
    task completed <Task finished coro=<task_func() done, defined at D:/satori/1.py:12> result='the result'>
    return value the result
    """
    # 在我们还没有await驱动任务执行的时候,是Task pending
    # 当await之后,已处于finished状态
    
    import asyncio
    
    '''
    通过create_task可以创建对象,那么也可以在任务完成前取消操作
    '''
    
    
    async def task_func():
        print("in task func")
        return "the result"
    
    
    async def main(loop):
        print("creating task")
        task = loop.create_task(task_func())
    
        print("canceling task")
        # 任务创建之后,可以调用cancel函数取消
        task.cancel()
        print(f"canceled task: {task}")
    
        try:
            # 任务取消之后再await则会引发CancelledError
            await task
        except asyncio.CancelledError:
            print("caught error from canceled task")
        else:
            print(f"task result: {task.result()}")
    
    
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(main(event_loop))
    finally:
        event_loop.close()
    """
    creating task
    canceling task
    canceled task: <Task cancelling coro=<task_func() running at D:/satori/1.py:8>>
    caught error from canceled task
    """
    

    多个task并发执行

    首先我们来看一个例子。

    import asyncio
    import time
    
    
    async def task_func():
        await asyncio.sleep(1)
    
    
    async def main():
        # 两者是一样的
        await task_func()
        await asyncio.ensure_future(task_func())
    
    
    start = time.perf_counter()
    asyncio.run(main())
    print(f"总用时:{time.perf_counter() - start}")  # 总用时:2.0023553
    

    我们发现总用时为2s,这是为什么?不是说遇见异步io会自动切换么?那么整体用时应该为1s才对啊。确实理论上是这样的,但是观察我们的代码是怎么写的,我们两个await是分开写的,而且await coroutine是能得到当前coroutine的返回值的,如果这个coroutine对应的task都还没执行完毕、future还没有set_result,我们又怎么能拿到呢?还是那句话,异步是在多个协程之间进行切换,至于当前的协程阻塞了只会切换到另一个协程里面去执行,但是对于当前协程来说,该阻塞多长还是阻塞多长,不可能说这一步阻塞还没过去,就直接调到下一行代码去执行,这是不可能的。

    因此两个await,必须等第一个await完毕之后,才会执行下一个await。至于我们刚才的call_soon、call_later、call_at,可以看做是另一个协程,在遇到了asyncio.sleep之后就切换过去了,但是对于协程本身来说,该asyncio.sleep多少秒还是多少秒, 只有sleep结束了,才会执行await asyncio.sleep下面的代码。还是那句话,切换是指多个协程之间切换,而我们上面代码是两个await,这两个await本身来说相当于还是串行,就是main协程里面的两行代码,只有第一个await结束了,才会执行第二个await。

    感觉说的有点啰嗦了,到后面的async for会再提一遍

    那么问题来了,我们如何才能让这两个协程并发的执行呢?

    asyncio.wait

    import asyncio
    import time
    
    
    async def task_func():
        await asyncio.sleep(1)
    
    
    async def main():
        # 将多个协程或者任务放在一个列表里面,传给asyncio.wait
        # 里面还可以再传其他参数:
        # timeout:超时时间,如果在这个时间段内任务没有执行完毕,那么任务直接取消
        # return_when:FIRST_COMPLETED,第一个任务完成时结束;FIRST_EXCEPTION,第一次出现异常时结束;ALL_COMPLETED,所有任务都完成是结束。默认是ALL_COMPLETED
        await asyncio.wait([task_func(), task_func()])
    
    
    start = time.perf_counter()
    asyncio.run(main())
    print(f"总用时:{time.perf_counter() - start}")  # 总用时:1.0012839
    """
    此时总共就只用了1s
    """
    

    获取任务的返回值

    import asyncio
    import time
    
    
    async def task_func(n):
        await asyncio.sleep(1)
        return f"task{n}"
    
    
    async def main():
        # 这个wait函数有两个返回值,一个是内部任务已完成的future,一个是未完成的future
        finished, pending = await asyncio.wait([task_func(_) for _ in range(1, 5)])
        # 我们说过一旦任务完成,future就会通过set_result方法设置返回值
        # 然后我们通过future.result()就能拿到返回值
        print(f"results: {[future.result() for future in finished]}")
    
    
    start = time.perf_counter()
    asyncio.run(main())
    print(f"总用时:{time.perf_counter() - start}")  
    """
    results: ['task2', 'task3', 'task1', 'task4']
    总用时:1.0015823
    """
    

    但是我们发现执行的顺序貌似不是我们添加的顺序,因此wait返回的future的顺序是无序的,如果希望有序,那么需要使用另一个函数

    asyncio.gather

    import asyncio
    import time
    
    
    async def task_func(n):
        await asyncio.sleep(1)
        return f"task{n}"
    
    
    async def main():
    
        # gather只有一个返回值,直接返回已完成的任务,注意是返回值,不是future,也就是说返回的是future.result()
        # 但是传递的时候就不要传递列表,而是需要传递一个个的task,因此我们这里要将列表打散
        finished = await asyncio.gather(*[task_func(_) for _ in range(1, 5)])
        print(f"results: {[res for res in finished]}")
    
    
    start = time.perf_counter()
    asyncio.run(main())
    print(f"总用时:{time.perf_counter() - start}")
    """
    results: ['task1', 'task2', 'task3', 'task4']
    总用时:1.0012363
    """
    

    使用gather是可以保证顺序的,顺序就是我们添加任务的顺序

    asyncio.as_completed

    我们看到这个wait类似于concurrent.futures里面的submit,gather类似于map,而concurrent.futures里面还有一个as_completed,那么同理asyncio里面也有一个as_completed。另外个人觉得asyncio借鉴了concurrent.futures里的不少理念,而且wait里面还有一个return_when,这个里面的参数,官网就是从concurrent.futures包里面导入的。

    那这个函数是用来干什么的呢?从名字也能看出来,是哪个先完成哪个就先返回

    import asyncio
    import time
    
    '''
    as_completed函数是一个生成器,会管理指定的一个协程列表,并生成它们的结果,每个协程结束运行时一次生成一个结果。
    与wait类似,as_completed不能保证顺序,从名字也能看出来,哪个先完成哪个先返回
    '''
    
    
    async def task_func(n):
        await asyncio.sleep(n)
        return f"task{n}"
    
    
    async def main():
    
        # 同样需要传递一个列表
        completed = asyncio.as_completed([task_func(2), task_func(1), task_func(3), task_func(4)])
        # 遍历每一个task,进行await,哪个先完成,就先返回
        for task in completed:
            res = await task
            print(res)
    
    
    start = time.perf_counter()
    asyncio.run(main())
    print(f"总用时:{time.perf_counter() - start}")
    """
    task1
    task2
    task3
    task4
    总用时:4.0048034999999995
    """
    

    同步原语

    尽管asyncio应用通常作为单线程的进程运行,不过仍被构建为并发应用。

    由于I/O以及其他外部事件的延迟和中断,每个协程或任务可能按照一种不可预知的顺序执行。

    为了支持安全的并发执行,asyncio包含了threading和multiprocessing模块中一些底层原语的实现

    import asyncio
    
    '''
    Lock可以用来保护对一个共享资源的访问,只有锁的持有者可以使用这个资源。
    如果有多个请求要得到这个锁,那么其将会阻塞,以保证一次只有一个持有者
    '''
    
    
    def unlock(lock):
        print("回调释放锁,不然其他协程获取不到。")
        print("但我是1秒后被调用,锁又在只能通过调用我才能释放,所以很遗憾,其他协程要想执行,至少要1秒后了")
        lock.release()
    
    
    async def coro1(lock):
        print("coro1在等待锁")
        # 使用async with语句很方便,是一个上下文。
        # 我们知道在多线程中,也可以使用with,相当于开始的lock.acquire和结尾lock.release
        # 那么在协程中,也有await lock.acquire和lock.release,以及专业写法async with
        async with lock:
            print("coro1获得了锁")
            print("coro1释放了锁")
    
    
    async def coro2(lock):
        print("coro2在等待锁")
        # 使用await lock.acquire()和lock.release()这种方式也是一样的
        await lock.acquire()
        print("coro2获得了锁")
        print("coro2释放了锁")
        # 注意release是不需要await的
        lock.release()
    
    
    async def main(loop):
        # 创建共享锁
        lock = asyncio.Lock()
    
        print("在开始协程之前创建一把锁")
        await lock.acquire()  # 这里先把锁给锁上
        print("锁是否被获取:", lock.locked())
    
        # 执行回调将锁释放,不然协程无法获取锁
        loop.call_later(1, unlock, lock)
    
        # 运行想要使用锁的协程
        print("等待所有协程")
        await asyncio.wait([coro1(lock), coro2(lock)])
    
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    """
    在开始协程之前创建一把锁
    锁是否被获取: True
    等待所有协程
    coro2在等待锁
    coro1在等待锁
    回调释放锁,不然其他协程获取不到。
    但我是1秒后被调用,锁又在只能通过调用我才能释放,所以很遗憾,其他协程要想执行,至少要1秒后了
    coro2获得了锁
    coro2释放了锁
    coro1获得了锁
    coro1释放了锁
    """
    

    事件

    和线程一样,协程里面也有事件的概念。asyncio.Event基于threading.Event,它允许多个消费者等待某个事件发生。

    Event对象可以使用set、wait、clear

    • set:设置标志位,调用is_set可以查看标志位是否被设置。一个刚创建的Event对象默认是没有设置的
    • wait:等待,在没有调用set的情况下,会阻塞。如果设置了set,wait则不会阻塞
    • clear:清空标志位
    import asyncio
    
    
    def set_event(event):
        print("设置标志位,因为协程会卡住,只有设置了标志位才会往下走")
        print("但我是一秒后才被调用,所以协程想往下走起码也要等到1秒后了")
        event.set()
    
    
    async def coro1(event):
        print("coro1在这里卡住了,快设置标志位啊")
        await event.wait()
        print(f"coro1飞起来了,不信你看现在标志位,是否设置标志位:{event.is_set()}")
    
    
    async def coro2(event):
        print("coro2在这里卡住了,快设置标志位啊")
        await event.wait()
        print(f"coro2飞起来了,不信你看现在标志位,是否设置标志位:{event.is_set()}")
    
    
    async def main(loop):
        # 创建共享事件
        event = asyncio.Event()
        # 现在设置标志位了吗?
        print("是否设置标志位:", event.is_set())
    
        # 执行回调将标志位设置,不然协程卡住了
        loop.call_later(1, set_event, event)
    
        # 运行卡住的的协程
        print("等待所有协程")
        await asyncio.wait([coro1(event), coro2(event)])
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    """
    是否设置标志位: False
    等待所有协程
    coro2在这里卡住了,快设置标志位啊
    coro1在这里卡住了,快设置标志位啊
    设置标志位,因为协程会卡住,只有设置了标志位才会往下走
    但我是一秒后才被调用,所以协程想往下走起码也要等到1秒后了
    coro2飞起来了,不信你看现在标志位,是否设置标志位:True
    coro1飞起来了,不信你看现在标志位,是否设置标志位:True
    """
    

    队列

    asyncio.Queue为协程提供了一个先进先出的数据结构,这与线程的queue.Queue或者进程里面的Queue很类似

    import asyncio
    import time
    
    
    async def consumer(q: asyncio.Queue, n):
        print(f"消费者{n}号 开始")
        while True:
            await asyncio.sleep(2)
            item = await q.get()
            print(f"消费者{n}号: 消费元素{item}")
            # 由于我们要开启多个消费者,为了让其停下来,我们添加None作为停下来的信号
            if item is None:
                # task_done是什么意思?队列有一个属性,叫做unfinished_tasks
                # 每当我们往队列里面put一个元素的时候,这个值就会加1,
                q.task_done()
                # 并且队列还有一个join方法,表示阻塞,什么时候不阻塞呢?当unfinished_tasks为0的时候。
                # 因此我们每put一个元素的时候,unfinished_tasks都会加上1,那么当我get一个元素的时候,unfinished_tasks是不是也应该要减去1啊,但是我们想多了
                # get方法不会自动帮我们做这件事,需要手动调用task_done方法实现
                break
            else:
                q.task_done()
    
    
    async def producer(q: asyncio.Queue, consumer_num):
        print(f"生产者 开始")
        for i in range(1, 22):
            await q.put(i)
            print(f"生产者: 生产元素{i},并放在了队列里")
        # 为了让消费者停下来,我就把None添加进去吧
        # 开启几个消费者,就添加几个None
        for i in range(consumer_num):
            await q.put(None)
    
        # 等待所有消费者执行完毕
        # 只要unfinished_tasks不为0,那么q.join就会卡住,直到消费者全部消费完为止
        await q.join()
        print("生产者生产的东西全被消费者消费了")
    
    
    async def main(consumer_num):
        q = asyncio.Queue()
        consumers = [consumer(q, i) for i in range(consumer_num)]
        await asyncio.wait(consumers + [producer(q, consumer_num)])
    
    
    start = time.perf_counter()
    asyncio.run(main(3))
    print(f"总用时:{time.perf_counter() - start}")
    """
    生产者 开始
    生产者: 生产元素1,并放在了队列里
    生产者: 生产元素2,并放在了队列里
    生产者: 生产元素3,并放在了队列里
    生产者: 生产元素4,并放在了队列里
    生产者: 生产元素5,并放在了队列里
    生产者: 生产元素6,并放在了队列里
    生产者: 生产元素7,并放在了队列里
    生产者: 生产元素8,并放在了队列里
    生产者: 生产元素9,并放在了队列里
    生产者: 生产元素10,并放在了队列里
    生产者: 生产元素11,并放在了队列里
    生产者: 生产元素12,并放在了队列里
    生产者: 生产元素13,并放在了队列里
    生产者: 生产元素14,并放在了队列里
    生产者: 生产元素15,并放在了队列里
    生产者: 生产元素16,并放在了队列里
    生产者: 生产元素17,并放在了队列里
    生产者: 生产元素18,并放在了队列里
    生产者: 生产元素19,并放在了队列里
    生产者: 生产元素20,并放在了队列里
    生产者: 生产元素21,并放在了队列里
    消费者1号 开始
    消费者2号 开始
    消费者0号 开始
    消费者1号: 消费元素1
    消费者2号: 消费元素2
    消费者0号: 消费元素3
    消费者1号: 消费元素4
    消费者2号: 消费元素5
    消费者0号: 消费元素6
    消费者1号: 消费元素7
    消费者2号: 消费元素8
    消费者0号: 消费元素9
    消费者1号: 消费元素10
    消费者2号: 消费元素11
    消费者0号: 消费元素12
    消费者1号: 消费元素13
    消费者2号: 消费元素14
    消费者0号: 消费元素15
    消费者1号: 消费元素16
    消费者2号: 消费元素17
    消费者0号: 消费元素18
    消费者1号: 消费元素19
    消费者2号: 消费元素20
    消费者0号: 消费元素21
    消费者1号: 消费元素None
    消费者2号: 消费元素None
    消费者0号: 消费元素None
    生产者生产的东西全被消费者消费了
    总用时:16.0055946
    """
    

    我们对队列进行循环,然后await的时候,实际上有一个更加pythonic的写法,也就是async for

    import asyncio
    import time
    from tornado.queues import Queue
    from tornado import gen
    
    
    # 注意asyncio中的Queue不支持async for,我们需要使用tornado中的Queue
    async def consumer(q: Queue, n):
        print(f"消费者{n}号 开始")
        async for item in q:
            await gen.sleep(1)
            if item is None:
                print(f"生产者{n}号:消费元素{item}")
                q.task_done()
                break
            print(f"生产者{n}号:消费元素{item}")
            q.task_done()
    
    
    async def producer(q: Queue, consumer_num):
        print(f"生产者 开始")
        for i in range(1, 22):
            await q.put(i)
            print(f"生产者: 生产元素{i},并放在了队列里")
        for i in range(consumer_num):
            await q.put(None)
    
        await q.join()
        print("生产者生产的东西全被消费者消费了")
    
    
    async def main(consumer_num):
        q = Queue()
        consumers = [consumer(q, i) for i in range(consumer_num)]
        await asyncio.wait(consumers + [producer(q, consumer_num)])
    
    
    start = time.perf_counter()
    asyncio.run(main(3))
    print(f"总用时:{time.perf_counter() - start}")
    """
    消费者2号 开始
    消费者1号 开始
    消费者0号 开始
    生产者 开始
    生产者: 生产元素1,并放在了队列里
    生产者: 生产元素2,并放在了队列里
    生产者: 生产元素3,并放在了队列里
    生产者: 生产元素4,并放在了队列里
    生产者: 生产元素5,并放在了队列里
    生产者: 生产元素6,并放在了队列里
    生产者: 生产元素7,并放在了队列里
    生产者: 生产元素8,并放在了队列里
    生产者: 生产元素9,并放在了队列里
    生产者: 生产元素10,并放在了队列里
    生产者: 生产元素11,并放在了队列里
    生产者: 生产元素12,并放在了队列里
    生产者: 生产元素13,并放在了队列里
    生产者: 生产元素14,并放在了队列里
    生产者: 生产元素15,并放在了队列里
    生产者: 生产元素16,并放在了队列里
    生产者: 生产元素17,并放在了队列里
    生产者: 生产元素18,并放在了队列里
    生产者: 生产元素19,并放在了队列里
    生产者: 生产元素20,并放在了队列里
    生产者: 生产元素21,并放在了队列里
    生产者2号:消费元素1
    生产者1号:消费元素2
    生产者0号:消费元素3
    生产者2号:消费元素4
    生产者1号:消费元素5
    生产者0号:消费元素6
    生产者2号:消费元素7
    生产者1号:消费元素8
    生产者0号:消费元素9
    生产者2号:消费元素10
    生产者1号:消费元素11
    生产者0号:消费元素12
    生产者2号:消费元素13
    生产者1号:消费元素14
    生产者0号:消费元素15
    生产者2号:消费元素16
    生产者1号:消费元素17
    生产者0号:消费元素18
    生产者2号:消费元素19
    生产者1号:消费元素20
    生产者0号:消费元素21
    生产者2号:消费元素None
    生产者1号:消费元素None
    生产者0号:消费元素None
    生产者生产的东西全被消费者消费了
    总用时:8.008154500000002
    """
    

    协程与线程结合

    如果出现了一个同步耗时的任务,我们可以将其扔到线程池里面去运行。对于协程来说,仍然是单线程的,我们是可以将耗时的任务单独开启一个线程来执行的

    import asyncio
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    def foo(n):
        time.sleep(n)
        print(f"foo睡了{n}秒")
    
    
    async def bar():
        await asyncio.sleep(3)
        return "bar"
    
    
    async def main():
        # 线程池最多装两个任务
        executor = ThreadPoolExecutor(max_workers=2)
        # loop.run_in_executor表示扔到线程池里面运行,这个过程是瞬间返回的
        loop.run_in_executor(executor, foo, 3)
        loop.run_in_executor(executor, foo, 2)
        print("瞬间返回")
        res = await bar()
        print(res)
    
    
    loop = asyncio.get_event_loop()
    start = time.perf_counter()
    loop.run_until_complete(main())
    print(f"总用时:{time.perf_counter() - start}")
    """
    瞬间返回
    foo睡了2秒
    foo睡了3秒
    bar
    总用时:3.0015592
    """
    

    关于async with 和async for

    如果让你定义一个类支持一个with和for语句,我相信肯定没有问题,但是async with和async for呢?我们要怎么实现呢?

    async with

    我们知道自定义一个类支持with语句,需要实现__enter____exit__这两个魔法方法,那么如果想支持async with,则需要实现__aenter____aexit__

    import asyncio
    
    
    class Open:
    
        def __init__(self, file, mode, encoding):
            self.file = file
            self.mode = mode
            self.encoding = encoding
            self.__fd = None
    
        # 要使用async def定义
        async def __aenter__(self):
            self.__fd = open(file=self.file, mode=self.mode, encoding=self.encoding)
            return self.__fd
    
        # 同样使用async def定义
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            self.__fd.close()
    
    
    # 既然是async就必须要创建协程,扔到事件循环里面运行
    async def main(file, mode="r", encoding="utf-8"):
        async with Open(file, mode, encoding) as f:
            print(f.read())
    
    
    asyncio.run(main("白色相簿.txt"))
    """
    为什么你那么熟练啊
    """
    

    可以看到我们自己实现了一个async with,但是注意这个不是异步的,我们还是调用了底层的open函数。

    当然还可以使用contextlib

    import asyncio
    import contextlib
    
    
    @contextlib.asynccontextmanager
    async def foo():
        print("xxx")
        l = list()
        yield l 
        print(l)
    
    
    async def main():
        async with foo() as l:
            l.append(1)
            l.append(2)
            l.append(3)
    
    
    asyncio.run(main())
    """
    xxx
    [1, 2, 3]
    """
    

    async for

    我们知道自定义一个类支持for语句,需要实现__iter____next__这两个魔法方法,那么如果想支持async for,则需要实现__aiter____anext__

    import asyncio
    
    
    class A:
    
        def __init__(self):
            self.l = [1, 2, 3, 4]
            self.__index = 0
    
        # 注意:定义__aiter__是不需要async的
        def __aiter__(self):
            return self
    
        # 但是定义__anext__需要async
        async def __anext__(self):
            try:
                res = self.l[self.__index]
                self.__index += 1
                return res
            except IndexError:
                # 捕获异常,协程则要raise一个StopAsyncIteration
                raise StopAsyncIteration
    
    
    async def main():
        async for _ in A():
            print(_)
    
    
    asyncio.run(main())
    """
    1
    2
    3
    4
    """
    

    另外我们知道,可以对for循环可以作用于生成器,那么async for则也可以作用于异步生成器中

    import asyncio
    
    
    # 具体版本记不清了,不知是3.5还是3.6,记得那时候引入async和await的时候,python是不允许async和yield两个关键字同时出现的
    # 但是现在至少python3.7是允许的,这种方式叫做异步生成器。
    # 但是注意:如果async里面出现了yield,那么就不可以有return xxx了。
    async def foo():
        yield 123
        yield 456
        yield 789
        print("xxx")
    
    
    async def main():
        async for _ in foo():
            print(_)
    
    
    asyncio.run(main())
    """
    123
    456
    789
    xxx
    """
    

    await

    很多人可能对python中的await这个关键字很懵逼,到底什么对象才可以被await呢?

    从抽象基类的源码中我们可以看到一个对象如果想被await,就必须要实现__await__这个魔法方法。

    import asyncio
    
    
    class A:
    
        def __await__(self):
            return "xxx"
    
    
    async def main():
        res = await A()
        print(res)
    
    try:
        asyncio.run(main())
    except Exception as e:
        print(e)  # __await__() returned non-iterator of type 'str'
    
    # 但是它报错了,意思是必须返回一个迭代器
    
    import asyncio
    
    
    class A:
    
        def __await__(self):
            return "xxx".__iter__()
    
    
    async def main():
        res = await A()
        print(res)
    
    try:
        asyncio.run(main())
    except Exception as e:
        print(e)  # Task got bad yield: 'x'
    
    # 说明要返回一个迭代器,然后yield,但是这里提示我们Task got bad yield: 'x'
    # 我们来分析一下这句话,bad yield: 'x',肯定是告诉我们yield出了一个不好的值
    # 这个不好的值被Task获取了,也就是不应该给Task一个'x'
    # 咦,Task,这是啥?我们首先想到了asyncio里面task,而task对应的类正是Task
    # 这是不是说明我们返回一个Task对象就是可以了
    
    import asyncio
    
    
    async def foo():
        return "我是foo"
    
    
    class A:
    
        def __await__(self):
            # 同样需要调用__iter__
            return asyncio.ensure_future(foo()).__iter__()
    
    
    async def main():
        res = await A()
        print(res)
    
    try:
        asyncio.run(main())
    except Exception as e:
        print(e)  
    """
    我是foo
    """
    # 可以看到成功执行
    # 其实兜了这么大圈子,完全没必要
    # await后面肯定是一个task(或者future),而我们await的是A(),那么A()的__await__方法返回的肯定是一个task(或者future)
    # await后面如果是coroutine,会自动包装成task,但是用我们自己的类返回的话,那么我们必须在__await__中手动的使用ensure_future或者create_task进行包装,再返回其迭代器
    
    

    手动实现异步

    尽管我们实现了async for、async with、await等等方法,但是它们并没有达到异步的效果,比如之前的async for底层还是使用了open。再比如网络请求,对于像requests这种库,是属于同步的,因此在协程函数中使用requests.get是没有用的。正如在协程函数中使用time.sleep一样,如果想切换,就必须使用asyncio.sleep,因为这本质上还是一个协程。所以如果想在获取网络请求的时候,使用requests来达到异步的效果是行不通的,只能通过底层的socket重新设计,比如aiohttp。再比如那些数据库驱动,也是一样的。

    目前来说,我们想实现异步,最好的方式就是通过线程池

    现在我们有两个文件

    import asyncio
    import time
    
    
    class Reader:
    
        def __init__(self, file):
            self.fd= open(file, "r", encoding="utf-8")
    
        async def __aenter__(self):
            return self
    
        def __read(self):
            # 假设读取文件需要两秒钟
            time.sleep(2)
            return self.fd.read()
    
        async def read(self):
            # 这个是可以await的,如果我们需要其返回值,那么需要进行await
            # 并且不传线程池,那么默认会创建一个
            return await loop.run_in_executor(None, self.__read)
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            self.fd.close()
    
    
    async def foo(name):
        async with Reader(name) as f:
            print(await f.read())
    
    
    async def main():
        await asyncio.wait([foo("白色相簿1.txt"), foo("白色相簿2.txt")])
    
    
    loop = asyncio.get_event_loop()
    start = time.perf_counter()
    loop.run_until_complete(main())
    print(f"总用时:{time.perf_counter() - start}")
    """
    白色相簿什么的,已经无所谓了。
    因为已经不再有歌,值得去唱了。
    传达不了的恋情,已经不需要了。
    因为已经不再有人,值得去爱了。
    为什么你会这么熟练啊!
    你和雪菜亲过多少次了啊!?
    你到底要把我甩开多远你才甘心啊!?
    总用时:2.0032377
    """
    # 可以看到总用时是2s
    
    import asyncio
    import time
    
    
    class Reader:
    
        def __init__(self, file):
            self.fd= open(file, "r", encoding="utf-8").readlines()
            self.__index = 0
    
        def __aiter__(self):
            return self
    
        def __read(self):
            # 假设读一行需要1s
            time.sleep(1)
            res = self.fd[self.__index]
            self.__index += 1
            return res
    
        async def __anext__(self):
            try:
                return await loop.run_in_executor(None, self.__read)
            except IndexError:
                raise StopAsyncIteration
    
    
    async def foo(name):
        async for line in Reader(name):
            # 由于文件结尾含有换行,我们将其去掉,因为print自带换行
            print(line[: -1])
    
    
    async def main():
        await asyncio.wait([foo("白色相簿1.txt"), foo("白色相簿2.txt")])
    
    
    loop = asyncio.get_event_loop()
    start = time.perf_counter()
    loop.run_until_complete(main())
    print(f"总用时:{time.perf_counter() - start}")
    """
    为什么你会这么熟练啊!
    白色相簿什么的,已经无所谓了。
    你和雪菜亲过多少次了啊!?
    因为已经不再有歌,值得去唱了。
    你到底要把我甩开多远你才甘心啊!
    传达不了的恋情,已经不需要了。
    因为已经不再有人,值得去爱了
    总用时:5.0069811
    """
    # 可以看到是并发交错打印的,其实我们可以使用__next__,而不是先全部读取再使用索引的方式
    # 但是如果那么做会报出一个TypeError:StopIteration interacts badly with generators and cannot be raised into a Future
    # 目前没找到原因,不知道是bug还是本人水平问题。
    

    并发请求网络

    asyncio是可以向web服务发送请求的,但是它不支持http请求,只支持tcp请求。但是这里不介绍了,建议使用aiohttp。

    其实主要是累了,此刻凌晨3点06分,想休息了。或许有点疲惫,如果没注意写错了,欢迎指正。

  • 相关阅读:
    安装cloudbase-init和qga批处理
    Windows添加自定义服务、批处理文件开机自启动方法
    Windows批处理:自动部署常用软件(静默安装)
    windows auto activate
    XML转译字符
    Leetcode908.Smallest Range I最小差值1
    Leetcode917.Reverse Only Letters仅仅反转字母
    Leetcode896.Monotonic Array单调数列
    Leetcode905.Sort Array By Parity按奇偶排序数组
    Leetcode892.Surface Area of 3D Shapes三维形体的表面积
  • 原文地址:https://www.cnblogs.com/traditional/p/11828780.html
Copyright © 2011-2022 走看看