假设我们要从一个网站用Python程序下载5张图片,最传统的思路就是写个for循环挨个挨个下载,但是这样做有个缺点,就是除了第一张,每张图片都必须等待前一张图片下载完毕后,才可以开始下载。由于网络有很高的延迟,为了不浪费CPU周期去等待,最好在收到网络响应之前做一些其他的事。比方,我们可以开启5个线程同时下载5张图片,当线程发起网络请求时,Python解释器切换到其他线程,而当网络请求发回响应时,Python解释器再切回到原先的线程,继续执行下个步骤
下面,我们来看两个小例子:
import os import time import sys import requests POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710", "pms_1512614327.2483640", "pms_1525853341.8312102"] # <1>设定我们要下载的五张图片的名称 BASE_URL = 'https://i1.mifile.cn/a1' # <2>图片的站点 DEST_DIR = 'downloads\' # <3>我们保存图片到本地的路径 def save_flag(img, filename): # <4>保存图片的方法 path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): # <5>从网络读取要下载的图片内容 url = '{}/{cc}.jpg'.format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def download_many(cc_list): # <6>循环下载图片 for cc in sorted(cc_list): image = get_flag(cc) sys.stdout.flush() save_flag(image, cc.lower() + '.jpg') return len(cc_list) def main(download_many): # <7>计算连续下载5张图片的时间 directory = os.path.join(DEST_DIR) if not os.path.exists(directory): os.mkdir(directory) t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = ' {} flags downloaded in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main(download_many)
运行结果:
5 flags downloaded in 0.50s
从上面可以看到,连续下载5张图片,需要0.5s。接下来,让我们用多线程下载5张图片,在用多线程下载之前,我们先介绍concurrent.futures模块,这个模块的主要特色是ThreadPoolExecutor和ProcessPoolExecutor类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类的内部维护着一个工作线程或进程池,以及要执行的任务队列
利用ThreadPoolExecutor,我们就可以实现多线程下载图片了:
import os from concurrent import futures import sys import requests import time MAX_WORKERS = 20 # <1>最大线程数 POP20_CC = ["pms_1508850965.67096774", "pms_1509723338.05097112", "pms_1508125822.19716710", "pms_1512614327.2483640", "pms_1525853341.8312102"] BASE_URL = 'https://i1.mifile.cn/a1' DEST_DIR = 'downloads\' def save_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = '{}/{cc}.jpg'.format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def download_one(cc): image = get_flag(cc) sys.stdout.flush() save_flag(image, cc.lower() + '.jpg') return cc def download_many(cc_list): # <2>多线程同时下载图片,这里活跃线程数为5,即5张图片 workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: res = executor.map(download_one, sorted(cc_list)) return len(list(res)) def main(download_many): directory = os.path.join(DEST_DIR) if not os.path.exists(directory): os.mkdir(directory) t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = ' {} flags downloaded in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main(download_many)
运行结果:
5 flags downloaded in 0.10s
从运行结果来看,5张图片只要0.1s,下载速度仅仅是上个例子的五分之一,速度大大提高。
executor.map()方法会返回一个生成器,因为可以迭代获取每个线程的执行结果
Future类:
标准库中有两个名为Future的类,分别是concurrent.futures.Future和asyncio.Future。这两个类的作用相同,两个Future类的实例都表示可能已经完成或尚未完成的延迟计算。通常,我们不用自己创建Future的实例,而是由并发框架来实例化。原因很简单,Future表示终将运行或者完成的事情,而确定某件事会发生的唯一方式就是执行的时间已经排定。因此,只有把特定的某件事交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。例如,Executor.submit()方法的参数就是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个Future对象
客户端不应该去修改Future对象的状态,并发框架会在线程计算完毕后改变Future对象的状态,而我们无控制计算的开始运行和结束运行
concurrent.futures.Future和asyncio.Future两个类中都有done()方法,这个方法不阻塞,返回的是布尔值,用来表明链接的可调用对象是否已经执行了。客户端代码通常不会询问Future对象是否运行结束,而是等待通知。因此,两个Future类都有add_done_callback()方法,这个方法只有一个参数,类型是可调用对象,Future运行结束后悔调用指定的可调用对象,如下例:
from concurrent.futures import ThreadPoolExecutor def add(x, y): return x + y def square(obj): res = obj.result() ** 2 print(res) return res t = ThreadPoolExecutor(2) t.submit(add, 1, 2).add_done_callback(square)
运行结果:
9
此外,concurrent.futures.Future和asyncio.Future都有result()方法。在Future运行结束后调用,这两个方法作用相同,都是返回可调用对象的结果,或者抛出执行可调用对象时抛出的异常。如果运行期还没结束的话,result()方法在两个Future类中的行为差别很大。对于concurrent.futures.Future实例来说,f.result()方法会阻塞调用方所在的线程,直到有结果返回。此时,result()方法可接受timeout参数,如果在指定的时间范围内可调用对象还没执行完,就会抛出TimeoutError异常,而asyncio.Future的result()方法不支持设定超时时间,从那个库中取出运行结果最好的办法是使用yield from结构。不过,对concurrent.futures.Future不能这么做
这两个库中有几个函数会返回Future对象,其他函数则是使用Future对象,如Executor.map()方法属于后者,返回的是一个迭代器,迭代器的__next__方法调用Future对象的result()方法,得到我们的运行结果
concurrent.futures.as_completed()函数的参数是一个Future列表,返回值是一个迭代器,在调用as_completed()方法时不会阻塞,只有当对迭代器进行循环时,每调用一次next()方法,如果当前Future对象还未执行结束,则会陷入阻塞
下面展示如何使用as_completed()函数:
import time from time import sleep from concurrent.futures import ThreadPoolExecutor, as_completed def download_img(cc): # <1> sleep(.03) return cc cc_list = ["a", "b", "c"] with ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in cc_list: future = executor.submit(download_img, cc) to_do.append(future) result = [] t0 = time.time() for future in as_completed(to_do): # <2> res = future.result() result.append(res) elapsed = time.time() - t0 msg = ' {} flags downloaded in {:.2f}s' print(msg.format(len(result), elapsed))
运行结果:
3 flags downloaded in 0.03s
<1>处该方法模拟下载一张图片后,返回图片名称
<2>as_completed()函数接收一个Future列表,返回一个生成器,在迭代时如果有Future对象还未运行完毕,则会陷入阻塞直到结果返回
阻塞型I/O和GIL
CPython解释器本身就不是线程安全的,因此有个全局解释器锁(GIL),一次只允许使用一个线程执行Python字节码。因此,一个Python进程通常不能同时使用多个CPU。然而,这并不意味着Python的多线程一无是处,相反,Python的多线程更适合I/O密集型的场景,当一个Python线程等待网络响应时,阻塞型的I/O函数会释放GIL,再运行另外一个线程。
如果非要再一个Python进程中使用多个CPU,有两个办法,第一个是编写Python的C扩展,这样可以真正的使用到系统的多个CPU,另外一个办法是在一个Python进程中再启动多个Python进程,用这个办法绕开GIL。由于第一个方法比较复杂,这里不做介绍,主要介绍第二个绕开GIL的办法,为了使用到操作系统的所有CPU,我们可以使用concurrent.futures.ProcessPoolExecutor类把一些CPU密集型的工作分配给多个Python进程来处理
下面的两个例子,分别用ThreadPoolExecutor类和ProcessPoolExecutor类来计算CPU密集型程序,这里的calc()方法就是我们的CPU密集型的方法,这个方法里不再发送网络请求,而是直接计算10000次的UUID
import time from concurrent.futures import ThreadPoolExecutor, as_completed import uuid m, n = 100, 10000 max_workers = 10 def calc(): for i in range(n): uuid.uuid1() return True def thread_pool_test(): to_do = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: t0 = time.time() for i in range(m): future = executor.submit(calc) to_do.append(future) result = [] for future in as_completed(to_do): res = future.result() result.append(res) elapsed = time.time() - t0 msg = ' {} flags downloaded in {:.2f}s' print(msg.format(len(result), elapsed)) if __name__ == "__main__": thread_pool_test()
运行结果:
100 flags downloaded in 3.26s
我们可以看到,我们设定10个活跃线程,发起100次的并发任务,每次任务都执行10000次的UUID的计算,最后所需时间为3.26S,我们再看下一个例子
import time from concurrent.futures import ProcessPoolExecutor, as_completed import uuid m, n = 100, 10000 def calc(): for i in range(n): uuid.uuid1() return True def process_pool_test(): with ProcessPoolExecutor() as executor: t0 = time.time() to_do = {executor.submit(calc): i for i in range(m)} result = [] for future in as_completed(to_do): res = future.result() result.append(res) elapsed = time.time() - t0 msg = ' {} flags downloaded in {:.2f}s' print(msg.format(len(result), elapsed)) if __name__ == "__main__": process_pool_test()
运行结果:
100 flags downloaded in 1.91s
在上面这个例子中,我们由原先的线程改为进程计算,这里我们并没有指定用多少个进程计算,一般不指定进程数量的时候,ProcessPoolExecutor默认最大进程数为os.cpu_count(),我是4核CPU,所以我的最大工作数量没有上一个例子中最大工作数20多,这里我们依然执行100次任务,每个任务都是执行10000次UUID的计算,但需要的时间只要1.91S
Executor.map()方法
如果想并发运行多个可调用对象,可使用Executor.map()方法,而不必像之前通过for循环再把可执行对象提交给Executor
from time import sleep, strftime from concurrent.futures import ThreadPoolExecutor max_workers = 3 def display(n): print(strftime('[%H:%M:%S]'), "%s thread start" % n) sleep(n) print(strftime('[%H:%M:%S]'), "%s thread end" % n) return n def main(): with ThreadPoolExecutor(max_workers=max_workers) as executor: results = executor.map(display, range(0, 6)) # <1> for index, result in enumerate(results): # <2> print('result {}: {}'.format(index, result)) main()
运行结果:
[10:51:31] 0 thread start [10:51:31] 0 thread end [10:51:31] 1 thread start [10:51:31] 2 thread start result 0: 0 [10:51:31] 3 thread start [10:51:32] 1 thread end [10:51:32] 4 thread start result 1: 1 [10:51:33] 2 thread end [10:51:33] 5 thread start result 2: 2 [10:51:34] 3 thread end result 3: 3 [10:51:36] 4 thread end result 4: 4 [10:51:38] 5 thread end result 5: 5
<1>处我们把一个可执行的对象以及所需参数,提交给executor.map()方法,返回一个生成器
<2>处for循环中的enumrate()函数会隐式调用next(results),这个函数会间接调用Future中的result()方法,这个方法会阻塞调用线程,一直到运行结束