zoukankan      html  css  js  c++  java
  • Python核心技术与实战——十七|Python并发编程之Futures

    不论是哪一种语言,并发编程都是一项非常重要的技巧。比如我们上一章用的爬虫,就被广泛用在工业的各个领域。我们每天在各个网站、App上获取的新闻信息,很大一部分都是通过并发编程版本的爬虫获得的。

    正确并合理的使用并发编程,无疑会给我们的程序带来极大性能上的提升。今天我们就一起学习Python中的并发编程——Futures。

    区分并发和并行

    我们在学习并发编程时,常常会听到两个词:并发(Concurrency)和并行(Parallelism)这两个术语。这两者经常一起使用,导致很多人以为他们是一个意思,其实是不对的。

    首先要辨别一个误区,在Python中,并发并不是只同一时刻上右多个操作(thread或者task)同时进行。相反,在某个特定的时刻上它只允许有一个操作的发生,只不过线程或任务之间会相互切换直到完成,就像下面的图里表达的

    在上图中出现了task和thread两种切换顺序的不同方式。分别对应了Python中并发两种形式——threading和asyncio。

    对于线程,操作系统知道每个线程的所有信息,因此他会做主在适当的时候做线程切换,这样的好处就是代码容易编写,因为程序员不需要做任何切换操作的处理;但是切换线程的操作,有可能出现在一个语句的执行过程中( 比如X+=1),这样比较容易出现race condiiton的情况。

    而对于asyncio,主程序想要切换任务的时候必须得到此任务可以被切换的通知,这样一来就可以避免出现上面的race condition的情况。

    至于所谓的并行,只在同一时刻、同时发生。Python中的multi-Processing便是这个意思对应多进程,我们可以这么简单的理解,如果我们的电脑是8核的CPU,那么在运行程序时,我们可以强制Python开启8个进程,同时执行,用以加快程序的运行速度。大概是下面这个图的思路

    对比看来,并发通常用于I/O操作频繁的场景。比方我们要从网站上下载多个文件,由于I/O操作的时间要比CPU操作的时长多的多,这时并发就比较适合。而在CPU使用比较heavy的场景中,为了加快运行速度,我们会多用几台机器,让多个处理器来运算。

    还记得以前写了个博客总结过:在Python中的多线程是依靠CPU切换上下文实现的一种“伪多线程”,在进行大量线程切换过程中会占用比较多的CPU资源,而在进行IO操作时候(不论是在网络上进行数据交互还是从内存、硬盘上读写数据)是不需要CPU进行计算的。所以多线程只适用于IO操作密集的环境,不适用于计算密集型操作。

    并发编程之Futures

    单线程于多线程性能比较

    我们下面通过一个实例,从代码的角度来理解并发编程中的Futures,并进一步比较其于单线程的性能区别

    假设我们有个任务,从网站上下载一些内容然后打印出来,如果用单线程的方式是这样实现的

    import requests
    import time
    
    def download_one(url):
        resp = requests.get(url)
        print('Read {} from {}'.format(len(resp.content),url))
    
    def download_all(urls):
        for url in urls:
            download_one(url)
    
    def main():
        sites = [
            'https://en.wikipedia.org/wiki/Portal:Arts',
            'https://en.wikipedia.org/wiki/Portal:History',
            'https://en.wikipedia.org/wiki/Portal:Society', 
            'https://en.wikipedia.org/wiki/Portal:Biography',
            'https://en.wikipedia.org/wiki/Portal:Mathematics',
            'https://en.wikipedia.org/wiki/Portal:Technology',
            'https://en.wikipedia.org/wiki/Portal:Geography',
            'https://en.wikipedia.org/wiki/Portal:Science',
            'https://en.wikipedia.org/wiki/Computer_science',
            'https://en.wikipedia.org/wiki/Python_(programming_language)',
            'https://en.wikipedia.org/wiki/Java_(programming_language)',
            'https://en.wikipedia.org/wiki/PHP',
            'https://en.wikipedia.org/wiki/Node.js',
            'https://en.wikipedia.org/wiki/The_C_Programming_Language',
            'https://en.wikipedia.org/wiki/Go_(programming_language)' 
        ]
    
        start_time = time.perf_counter()
    
        download_all(sites)
        end_time = time.perf_counter()
    
        print('Download {} sites in {} seconds'.format(len(sites),end_time-start_time))
    
    
    if __name__ == '__main__':
        main()

    这是种最简单暴力最直接的方式:

    先遍历存储网站的列表

    对当前的网站进行下载操作

    当前操作完成后,再对下一个网站进行同样的操作,一直到结束。

    可以试出来总耗时大概是2s多,单线程的方式简单明了,但是最大的问题是效率低下,程序最大的时间都消耗在I/O等待上(这还是用的print,如果是写在硬盘上的话时间会更多)。如果在实际生产环境中,我们需要访问的网站至少是以万为单位的,所以这个方案根本行不通。

    接着我们看看多线程版本的代码

    import concurrent.futures
    import requests
    import threading
    import time
    
    def download_one(url):
        resp = requests.get(url).content
        print('Read {} from {}'.format(len(resp),url))
    
    def download_all(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            executor.map(download_one,sites)
    
    def main():
    
        sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society', 
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)' 
        ]
        
        start_time = time.perf_counter()
        download_all(sites)
        # for i in sites:
        end_time = time.perf_counter()
        
        # print('Down {} sites in {} seconds'.format(len(sites),end_time-start_time))
    
    if __name__ == '__main__':
        main()

    这段代码的运行时长大概是0.2s,效率一下提升了10倍多,可以注意到这个版本和单线程的区别主要在下面:

    def download_all(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            executor.map(download_one,sites)

    在上面的代码中我们创建了一个线程池,有5个线程可以分配使用。executer.map()与以前将的Python内置的map()函数,表示对sites中的每一个元素并发的调用函数download_one()函数。

    顺便提一下,在download_one()函数中,我们使用的requests.get()方法是线程安全的(thread-safe),因此在多线程的环境下,它也可以安全使用,并不会出现race condition(条件竞争)的情况。

    另外,虽然线程的数量可以自己定义,但是线程数并不是越多越好,以为线程的创建、维护和删除也需要一定的开销。所以如果设置的很大,反而会导致速度变慢,我们往往要根据实际的需求做一些测试,来寻找最优的线程数量。

    当然,我们也可以用并行的方式去提高运行效率,只需要在download_all()函数中做出下面的变化即可

    def download_all(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        with concurrent.futures.ProcessPoolExecutor() as executor:   #这个方法可以改成并行
            executor.map(download_one,sites)

    在需要改的这部分代码中,函数ProcessPoolExecutor()表示创建进程池,使用多个进程并行的执行程序。不过,这里 通常省略参数workers,因为系统会自动返回CPU的数量作为可以调用的进程数。

    就像上面说的,并行方式一般用在CPU密集型的场景中,因为对于I/O密集型操作多数时间会用于等待,相比于多线程,使用多进程并不会提升效率,反而很多时候,因为CPU数量的限制,会导致执行效率不如多线程版本。

    到底什么是Futures?

    Python中的Futures,位于concurrent.futures和asyncio中,他们都表示带有延迟的操作,Futures会将处于等待状态的操作包裹起来放到队列中,这些操作的状态可以随时查询。而他们的结果或是异常,也能在操作后被获取。

    通常,作为用户,我们不用考虑如何去创建Futures,这些Futures底层会帮我们处理好,我们要做的就是去schedule这些Futures的执行。比方说,Futures中的Executor类,当我们中的方法done(),表示相对应的操作是否完成——用True表示已完成,ongFalse表示未完成。不过,要注意的是done()是non-blocking的,会立刻返回结果,相对应的add_done_callback(fn),则表示Futures完成后,相对应的参数fn,会被通知并执行调用。

    Futures里还有一个非常重要的函数result(),用来表示future完成后,返回器对应的结果或异常。而as_completed(fs),则是针对给定的future迭代器fs,在其完成后,返回完成后的迭代器。

    所以也可以把上面的例子写成下面的形式:

    def download_all(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            to_do = []
            for site in sites:
                future = executor.submit(download_one,site)
                to_do.append(site)
    
            for future in concurrent.futures.as_completed(to_do):
                future.result()

    这里,我们首先用executor.submit(),将下载每个网站的内容都放进future队列to_do里等待执行。然后是as_completed()函数,在future完成后输出结果

    不过这里有个事情要注意一下:future列表中每个future完成的顺序和他在列表中的顺序不一定一致,至于哪个先完成,取决于系统的调度和每个future的执行时间。

    为什么多线程每次只有一个线程执行?

    前面我们讲过,在一个时刻下,Python主程序只允许有一个线程执行,所以Python的并发,是通过多线程的切换完成的,这是为什么呢?

    这就又和以前讲的知识串联到一起了——GIL(全局解释器锁),这里在复习下:

    事实上,Python的解释器并不是线程安全的,为了解决由此带来的race condition等问题,Python就引入了GIL,也就是在同一个时刻,只允许一个线程执行。当然,在进行I/O操作是,如果一个线程被block了,GIL就会被释放,从而让另一个线程能够继续执行。

    总结

    这节课里我们先学习了Python中并发和并行的概念

    并发——通过线程(thread)和任务(task)之间相互切换的方式实现,但是同一时刻,只允许有一个线程或任务执行

    并行——多个进程同时进行。

    并发通常用于I/O频繁操作的场景,而并行则适用于CPU heavy的场景

    随后我们通过一个下载网站内容的例子,比较了单线程和运用FUtures的多线程版本的性能差异,显而易见,合理的运用多线程,能够极大的提高程序运行效率。

    我们还大致了解了Futures的方式,介绍了一些常用的函数,并辅以实例加以理解。

    要注意,Python中之所以同一时刻只允许一个线程运行,其实是由于GIL的存在。但是对于I/O操作而言,当其被block的时候,GIL会被释放,使其他线程继续执行。

  • 相关阅读:
    虚拟机字节码执行引擎 —— 方法调用
    虚拟机字节码执行引擎 —— 运行时栈帧
    Java 虚拟机类加载机制
    MySQL提升笔记(1):MySQL逻辑架构
    【JVM进阶之路】十:JVM调优总结
    【JVM进阶之路】九:性能监控工具-可视化工具篇
    SpingCloud Alibaba实战(1:微服务与SpringCloud Alibaba)
    【JVM进阶之路】八:性能监控工具-命令行篇
    【JVM进阶之路】七:垃圾收集器盘点
    【JVM进阶之路】六:垃圾收集理论和算法
  • 原文地址:https://www.cnblogs.com/yinsedeyinse/p/11905128.html
Copyright © 2011-2022 走看看