zoukankan      html  css  js  c++  java
  • 关于asyncio知识(一)

    一、介绍

    asyncio 是python3.4 引入的一个新的并发模块,主要通过使用coroutines 和 futures 来让我们更容易的去实现异步的功能,并且几乎和写同步代码一样的写代码,还没有烦人的回调。

    在2018年6月 3.7的更新中针对asyncio的api进行了一些升级,主要是关于task的管理以及 event loops 方面。后面会把3.7的增加的新特性专门整理一篇文章。

    现状:
    其实目前来说asyncio相关的异步库并不完善,官网也并没有专门维护,在github上有一个俄罗斯的小组在开发维护一些常用的库如:aiomysql, aiopika, aioredis等。 这里有一点需要在这里提前说明:如果目前想要用asyncio异步的功能,那么你整个代码中其他的库也要是异步的而不能是阻塞的,如果我们需要用aiomysql 而不能用pymysql, 我们需要用aiohttp 而不能使用requests等等等。如果恰巧你用的一些库,现在并没有相对应的异步库,那么可能就比较麻烦了。

    二、Threads, loops, coroutines and futures

    1. event loop:主要负责管理和分发不同task的执行,我们可以将不同的任务注册在event loop上。
    2. coroutines: 我们通常也称之为协程,是与python生成器类似的特殊的函数,在这个函数中通常会有一个关键字await ,当coroutine执行到await 的时候,就会将控制权释放给event loop. 如果一个coroutine被包装成一个Future类型的Task中,那么这个coroutine就需要被event loop 去调度执行
    3. futures:代表将来执行或没有执行的任务的结果,当然这个结果可能是一个异常

    三、同步VS异步

    asyncio 允许我们将子任务定义为coroutine,并允许你来调度它们,而在多线程中,这个调度通常是交给操作系统控制我们并不能控制。我们先通过下面的一个例子理解:

    import asyncio
    async def foo():
        print("running in foo")
        await asyncio.sleep(0)
        print("back foo")
    async def bar():
        print("running in bar")
        await asyncio.sleep(0)
        print("back bar")
    async def main():
        tasks = [foo(), bar()]
        await asyncio.gather(*tasks)
    asyncio.run(main())

    上述代码的运行结果如下:

    running in foo
    running in bar
    back foo
    back bar

    针对上述代码的一个说明:

    1. 切记这里的sleep只能用asyncio里面的,不能直接用sleep。这里我们看到coroutine通过await的方式将控制权交还给了event loop,并切换到计划执行的下一个任务
    2. 关于gather的使用这里可以暂时忽略,后面文章会详细说明
    3. 最后使用的asyncio.run是3.7更新的新方法,负责创建一个事件循环并调度coroutine,在3.7之前是需要我们手动创建loop:asyncio.new_event_loop()

    当我们的代码是同步执行的时候,执行的顺序是线性的,如果我们是异步的,顺序就变得不确定了,我们通过一个简单的爬虫的例子来理解:

    import time
    import random
    import asyncio
    import aiohttp
    URL = 'https://baidu.com'
    MAX_CLIENTS = 3
    async def aiohttp_get(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response
    async def fetch_async(pid):
        start = time.time()
        sleepy_time = random.randint(2, 5)
        print('fetch coroutine {} started, sleeping for {} seconds'.format(
            pid, sleepy_time))
        response = await aiohttp_get(URL)
        datetime = response.headers.get('Date')
        #  这里增加的asyncio.sleep是为了模拟每个请求有一定延迟返回
        await asyncio.sleep(sleepy_time)
        response.close()
        return 'coroutine {}: {}, took: {:.2f} seconds'.format(
            pid, datetime, time.time() - start)
    async def main():
        start = time.time()
        futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
        for i, future in enumerate(asyncio.as_completed(futures)):
            result = await future
            print('{} {}'.format(">>" * (i + 1), result))
        print("all took: {:.2f} seconds".format(time.time() - start))
    asyncio.run(main())

    上述代码中,我们在每个请求里都添加了asyncio.sleep的操作,这里其实是为了模拟实际情况中当我们请求多个网站的时候,因为网络和目标网站的不同,请求返回的时间一般不同。
    运行结果如下:

    fetch coroutine 2 started, sleeping for 5 seconds
    fetch coroutine 1 started, sleeping for 3 seconds
    fetch coroutine 3 started, sleeping for 4 seconds
    >> coroutine 1: Wed, 27 Feb 2019 11:27:58 GMT, took: 3.09 seconds
    >>>> coroutine 3: Wed, 27 Feb 2019 11:27:58 GMT, took: 4.08 seconds
    >>>>>> coroutine 2: Wed, 27 Feb 2019 11:27:58 GMT, took: 5.12 seconds
    all took: 5.12 seconds

    关于return_when参数

    这个参数是当我们执行多个任务的时候,我只关注最快返回结果的那个任务,用法例子如下(注意我这里为了让复现一个错误,先用了python3.7之前创建loop的方法):

    import time
    import random
    import asyncio
    import aiohttp
    from concurrent.futures import FIRST_COMPLETED
    URL = 'https://baidu.com'
    MAX_CLIENTS = 3
    async def aiohttp_get(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response
    async def fetch_async(pid):
        start = time.time()
        sleepy_time = random.randint(2, 5)
        print('fetch coroutine {} started, sleeping for {} seconds'.format(
            pid, sleepy_time))
        response = await aiohttp_get(URL)
        datetime = response.headers.get('Date')
        #  这里增加的asyncio.sleep是为了模拟每个请求有一定延迟返回
        await asyncio.sleep(sleepy_time)
        response.close()
        return 'coroutine {}: {}, took: {:.2f} seconds'.format(
            pid, datetime, time.time() - start)
    async def main():
        start = time.time()
        futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
        done, pending = await asyncio.wait(
            futures, return_when=FIRST_COMPLETED
        )
        print(done.pop().result())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

    运行结果会出现如下情况:

    fetch coroutine 2 started, sleeping for 2 seconds
    fetch coroutine 1 started, sleeping for 5 seconds
    fetch coroutine 3 started, sleeping for 2 seconds
    coroutine 2: Wed, 27 Feb 2019 11:41:19 GMT, took: 2.11 seconds
    Task was destroyed but it is pending!
    task: <Task pending coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000000038E5798>()]>>

    其实这里出现这种问题的原因,我们很容易理解,我们开启了三个任务,当我们收到最快完成的那个之后就关闭了循环,后面的两个任务还处于pending状态,asyncio 认为这是一个错误,所以打印出了我们看到的那个警告:Task was destroyed but it is pending!
    我们如何解决这个问题呢?

    四、关于future

    future有四种状态:

    1. Pending
    2. Running
    3. Done
    4. Cancelled

    我们可以通过调用done, cancelled 或者 running 来看当前future是否处于该状态,这里再次提醒,done 状态可以表示返回结果,也可以表示跑出了异常。我们也可以通过调用cancel来专门取消future,不过在python3.7之后,asyncio.run替我们做了这些事情,我们把上面的那个出现Task was destroyed but it is pending!的代码进行更改:

    import time
    import random
    import asyncio
    import aiohttp
    from concurrent.futures import FIRST_COMPLETED
    URL = 'https://baidu.com'
    MAX_CLIENTS = 3
    async def aiohttp_get(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response
    async def fetch_async(pid):
        start = time.time()
        sleepy_time = random.randint(2, 5)
        print('fetch coroutine {} started, sleeping for {} seconds'.format(
            pid, sleepy_time))
        response = await aiohttp_get(URL)
        datetime = response.headers.get('Date')
        #  这里增加的asyncio.sleep是为了模拟每个请求有一定延迟返回
        await asyncio.sleep(sleepy_time)
        response.close()
        return 'coroutine {}: {}, took: {:.2f} seconds'.format(
            pid, datetime, time.time() - start)
    async def main():
        start = time.time()
        futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
        done, pending = await asyncio.wait(
            futures, return_when=FIRST_COMPLETED
        )
        print(done.pop().result())
    asyncio.run(main())

    运行结果如下,完全正常了:

    fetch coroutine 2 started, sleeping for 5 seconds
    fetch coroutine 3 started, sleeping for 2 seconds
    fetch coroutine 1 started, sleeping for 2 seconds
    coroutine 3: Wed, 27 Feb 2019 11:54:13 GMT, took: 2.07 seconds

    future还有一个实用的功能:允许我们在future变成完成状态时添加callback回调.

    关于future的完成时结果的获取,通过下面代码来演示:

    import time
    import random
    import asyncio
    import aiohttp
    from concurrent.futures import FIRST_COMPLETED
    URL = 'https://httpbin.org/get'
    MAX_CLIENTS = 3
    async def aiohttp_get(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response
    async def fetch_async(pid):
        start = time.time()
        sleepy_time = random.randint(2, 5)
        print('fetch coroutine {} started, sleeping for {} seconds'.format(
            pid, sleepy_time))
        response = await aiohttp_get(URL)
        datetime = response.headers.get('Date')
        #  这里增加的asyncio.sleep是为了模拟每个请求有一定延迟返回
        await asyncio.sleep(sleepy_time)
        response.close()
        return 'coroutine {}: {}, took: {:.2f} seconds'.format(
            pid, datetime, time.time() - start)
    async def main():
        start = time.time()
        futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
        done, pending = await asyncio.wait(
            futures
        )
        print(done)
        for future in done:
            print(future.result())
    asyncio.run(main())

    运行结果如下:

    fetch coroutine 2 started, sleeping for 5 seconds
    fetch coroutine 1 started, sleeping for 2 seconds
    fetch coroutine 3 started, sleeping for 4 seconds
    {<Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 3:... 5.31 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 1:... 3.34 seconds'>, <Task finished coro=<fetch_async() done, defined at e:/vs_python/lean_asyncio/ex2.py:17> result='coroutine 2:... 6.38 seconds'>}
    coroutine 3: Wed, 27 Feb 2019 12:10:15 GMT, took: 5.31 seconds
    coroutine 1: Wed, 27 Feb 2019 12:10:15 GMT, took: 3.34 seconds
    coroutine 2: Wed, 27 Feb 2019 12:10:15 GMT, took: 6.38 seconds

    我们可以看到,当所有任务完成后,我们可以通过done获取每个人的结果信息。

    我们也可以给我们的任务添加超时时间

    import time
    import random
    import asyncio
    import aiohttp
    from concurrent.futures import FIRST_COMPLETED
    URL = 'https://httpbin.org/get'
    MAX_CLIENTS = 3
    async def aiohttp_get(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return response
    async def fetch_async(pid):
        start = time.time()
        sleepy_time = random.randint(2, 5)
        print('fetch coroutine {} started, sleeping for {} seconds'.format(
            pid, sleepy_time))
        response = await aiohttp_get(URL)
        datetime = response.headers.get('Date')
        #  这里增加的asyncio.sleep是为了模拟每个请求有一定延迟返回
        await asyncio.sleep(sleepy_time)
        response.close()
        return 'coroutine {}: {}, took: {:.2f} seconds'.format(
            pid, datetime, time.time() - start)
    async def main():
        start = time.time()
        futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
        done, pending = await asyncio.wait(
            futures, return_when=FIRST_COMPLETED,timeout=0.01
        )
        print(done)
        for future in done:
            print(future.result())
    asyncio.run(main())

    我这里把超时时间设置的非常小了是0.01,导致最后我打印done结果的时候其实三个任务没有一任务是被完成的:

    fetch coroutine 2 started, sleeping for 4 seconds
    fetch coroutine 3 started, sleeping for 3 seconds
    fetch coroutine 1 started, sleeping for 4 seconds
    set()

    五、总结

    这里对python asyncio先进行整体功能的整理,会面会针对细节做详细整理。相对来说现在各个公司实际线上用asyncio的应该不多,也希望更多的小伙伴来相互交流,分享这个python以及python异步相关心得。欢迎加入交流群:948510543

  • 相关阅读:
    [题解]POJ_1417_(并查集背包
    [题解]POJ_2912_(带权并查集
    [题解]luogu_P1502_窗口的星星(扫描线
    [题解/模板]POJ_1733_Pairty game(带权并查集/扩展域
    [题解/模板]悬线法luogu_P1169_棋盘制作(悬线法
    [题解]luogu_P2059_卡牌游戏(状态设计/概率dp
    [题解]luogu_P2467_地精部落(思维dp
    「CJOJ2439」军队
    「LG2397」yyy loves Maths VI (mode)
    「LG2114」起床困难综合症
  • 原文地址:https://www.cnblogs.com/zhaof/p/10446623.html
Copyright © 2011-2022 走看看