zoukankan      html  css  js  c++  java
  • Python核心技术与实战——十八|Python并发编程之Asyncio

    我们在上一章学习了Python并发编程的一种实现方法——多线程。今天,我们趁热打铁,看看Python并发编程的另一种实现方式——Asyncio。和前面协程的那章不太一样,这节课我们更加注重原理的理解。

    通过上节课的学习,我们知道在进行I/O操作的时候,使用多线程与普通的单线程比较,效率有了很大的提高,既然这样,为什么还要Asyncio呢?

    虽然多线程有诸多优点并且应用广泛,但是也存在一定的局限性:

    ※多线程运行过程很容易被打断,因此有可能出现race condition的情况

    ※线程的切换存在一定的消耗,线程数量不能无限增加,因此,如果I/O操作非常密集,多线程很有可能满足不了高效率、高质量的需求。

    针对这些问题,Asyncio应运而生。

    什么是Asyncio?

    Sync VS Async

    我们首先来区分一下Sync(同步)和Async(异步)的概念。

    ※所谓Sync,是指操作一个接一个的执行,下一个操作必须等上一个操作完成后才能执行。

    ※而Async是指不同操作之间可以相互交替执行,如果某个操作被block,程序并不会等待,而是会找出可执行的操作继续执行。


    举个简单的例子,我们要做一个报表并用邮件发送给老板,看看两种方式有什么不同:

    ※按照Sync的方式,我们相软件里输入各项数据,然后等5分钟生成了报表明细以后,再写邮件发送给老板

    ※而按照Async的方式,在输完数据以后,开始生成报表,但这个时候我们不干等这报表生成而是去写邮件,等报表明细生成以后,我们暂停邮件的编写去查看报表,确认以后继续写邮件知道发送完毕。 

    Asyncio的工作原理

    明白了Sync和Async的套路,我们回到今天的主题,到底什么是Asyncio呢?

    事实上,Asyncio和其他的Python程序一样,是单线程的,他只有一个主线程,但是恶意进行多个不同任务(task),这里的任务,就是特殊的future对象,这些不同的任务,被一个叫做event loop(事件循环)的对象控制。我可以把这里的任务,类比成多线程版本里的多个线程。

    为了简化的了解这个问题,我们可以假设任务只有两个状态:一是预备状态;而是等待状态、预备状态是指任务目前空闲,但随时准备运行。而等待状态,是指已经运行,但正在等待外部的操作完成,比如I/O操作。

    在这种情况下,事件循环会维护两个任务列表,分别对应这两种状态;并且选取预备状态的一个任务(具体选取那个任务,和其等待的时间长短、占用的资源等等相关),使其运行,一直到任务把控制权教会给事件循环为止。

     值得一提的是,对于Asyncio来说,他的任务在运行时不会被外部的因素打断,因此Asyncio内的操作不会出现race condition的情况,这样就不需要我们担心线程安全的问题了。

    Asyncio的用法

    讲完了Asyncio的原理,我们结合具体的代码来看一下他的用法。还是以上一节课里下载网站上的内容为例,用Asyncio的写法如下(依旧是省略了异常处理)

    import asyncio
    import aiohttp
    import time
    async def download_one(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                print('Read {} from {}.'.format(resp.content_length,url))
    
    async def download_all(sites):
        tasks = [asyncio.create_task(download_one(site)) for site in sites]
        await asyncio.gather(*tasks)
    
    
    def main():
        sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society', 
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)' 
        ]
    
        start_time = time.perf_counter()
        asyncio.run(download_all(sites))
        end_time = time.perf_counter()
        print('Down {} sites in {} seconds'.format(len(sites),end_time-start_time))
    
    if __name__ == '__main__':
        main()

    这里的Async和await关键字是Asyncio的最新的写法,表示这个语句/函数是non-blocked的,正好对应了前面讲的event loop的概念。如果任务执行的过程需要等待,则将其放入等待的列表中,然后继续执行状态列表里的任务。

    主函数里的asyncio.run(coro)是Asyncio的root call,表示拿到event loop,运行输入的coro,直到他结束,最后关闭这个event loop。事实上,asyncio.run()是Python3.7+以后才引入的,相当于以前的版本中下面的语法

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(coro)
    finally:
        loop.close()

     至于Asyncio版本内的download_all(),和之前多线程版本也有很大的区别:

    task = [asyncio.create_task(download_one(site)) for site in sites]
    await asyncio.gather(*task)

    这里的asynco.creat_task(core),表示对输入的协程coro创建一个任务,安排他的执行,并返回此任务对象。这个函数也是Python3.7以后的版本增加的,如果是之前的版本,我们可以用下面的方法代替:

    asyncio.ensure_future(coro)

    可以看到,这里我们对每一个网站的下载,都创建了一个对应的任务。

    再往下看,asyncio.gather(*aws,loop=None,return_exception = False),则表示在事件循环中运行aws序列中所有的任务。当然,除了例子中用到的几个函数,Asyncio还提供了很多其他的用法,我们可以通过Python官方文档查看

    最后我们可以通过最后的输出结果发现,这种方式的效率要比之前的多线程版本还要高一些,充分体现出其优势。

    Asyncio的缺陷

    通过前面的讲解我们可以看出Asyncio的强大,但是任何一种方案都不是完美无瑕的,都存在一定的局限性,当然Asyncio也同样如此。

    在实际的工作中,要想用好Asyncio,特别是要发挥好其强大的功能,很多情况下必须要有相应的Python库作为支持,我们可能发现了在前面的多线程编程中我们都是用的request库,但是在这里我们用的是aiohttp库,原因就是request库是不兼容Asyncio的,而aiohttp库兼容。

    Asyncio软件库的兼容性问题在Python3的早期一直是一个大问题,但是随着技术的发展,这个问题也在逐步得到解决。

    另外,在使用Asyncio时,因为在任务的调度方面有了了更大的自主权,写代码就要更加注意,否则会很容易出错。

    举个例子,如果我们需要await一系列的操作,就带使用asyncio.gathrer();如果是单个的Futures,或许使用asyncio.wait()就可以了。那么,对于一个future,我们是需要他run_until_complete()还是run_forever(),都是要好好思考一下的。诸如此类,都是我们在面对具体问题时需要考虑的。

    多线程还是Asyncio?

    我们已经把并发编程的两种方式都讲了,不过,遇到实际问题,我们选择那种编程方式呢?

    总得来是,我们可以遵循下面的规范

    ※如果是I/O bound,并且I/O操作很慢,需要很多任务/线程协同实现,那么使用Asyncio更加合适

    ※如果是I/O bound,但是I/O操作很快,只需要有限数量的任务或线程,那么使用多线程就可以了

    ※如果是CPU bound,则需要多进程来提高运行效率。

    总结

    在今天的学习中,我们一起学习了Asyncio的原理和用法,比较了Asyncio和多线程各自的优缺点。

    共同点:

    都是并发操作,多线程同一时间点只有一个线程在运行,而协程是只有一个任务在执行;

    不同点:多线程是在I/O阻塞的时候通过切换线程来达到并发的效果,什么时候切换是由操作系统决定的,开发者不用操心,但会造成race  condition;

        协程是只有一个线程,在I/O阻塞时候通过在线程内切换任务来达到并发的效果,在什么时候切换是由开发者决定的,不会有race condition的情况。

    不同于多线程,Asyncio是单线程,但其内部event loop的训话机制,可以让他并发的运行多个不同的任务,并且比多线程享有更多的自主控制权。

    Asyncio中的任务,在运行的过程中不会被打断,因此不会出现race condition的情况。尤其是咋I/O操作比较密集的时候 ,Asyncio的运行效率会更高,远比线程切换的损耗要小。并且Asyncio可以开启的任务数量也比多线程中的线程数量多。

    但是要注意的是,很多情况下使用Asyncio需要特定的三方库的支持,,而如果I/O操作比较快并且不heavy,使用多线程也能有效的解决问题。

    思考题

    我们已经讲了两种并发编程的思路,也多次提到了并行编程(multi-processing),其适用于CPU heavy的场景,

    现在的需求是输入一个列表,随便指定一个元素,求出从0到这个元素所有整数的平方和。下面是常规写法,如果有多进程版本,又要怎么写呢?

    import time
    def cpu_bound(number):
        print(sum(i*i for i in range(number)))
    
    def calculate_sum(numbers):
        for number in numbers:
            cpu_bound(number)
    
    def main():
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        calculate_sum(numbers)
        end_time = time.perf_counter()
    
        print('Calculation takds {} seconds'.format(end_time-start_time))
    
    if __name__ == '__main__':
        main()

    运行结果(只贴出来运行的总时长)

    Calculation takds 20.637497200000002 seconds

    在来看看这种方法

    import time
    import multiprocessing
    
    def cpu_bound(number):
        return sum(i*i for i in range(number))
    
    def find_sums(numbers):
        with multiprocessing.Pool() as pool:
            pool.map(cpu_bound,numbers)
    
    if __name__ == '__main__':
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        find_sums(numbers)
        end_time = time.perf_counter()
        print('Calculation takds {} seconds'.format(end_time-start_time))

    然后来看看最终的运行时间

    Calculation takds 7.3418618 seconds

    因为这里需要用大量的计算,所以使用的是多进程的方式来提高了程序的效率。

  • 相关阅读:
    PHP设计模式
    PHP 面向对象
    MYSQL 覆盖索引
    MYSQL IOPS、QPS、TPS
    MySQL 事务嵌套
    MySQL 慢查询优化
    MySQL 查询状态
    MySQL 乐观锁和悲观锁
    MySQL 分库、分表
    Spring Boot 全局异常捕捉,自定义异常并统一返回
  • 原文地址:https://www.cnblogs.com/yinsedeyinse/p/11931572.html
Copyright © 2011-2022 走看看