一般我在写笔记之前,我都会粗略的看一下本章节内容,根据我对future的使用,感觉它就向对threading与process的高度封装使用。
这是一个模板化使用多线程,多进程的方式,不需要过多的去了解多线程,多进程使用中的细节。
17.1.1依次下载的脚本
下面是个单进程,单线程下载文件的文件的脚本,感觉逻辑写大非常好,大神就是大神。
import os import time import sys import requests POP20_CC = ('CN IN US ID BR OK NG BD RU JP ' 'MX PH VN EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' def save_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = '{}/{cc}/{cc}'.format(BASE_URL, cc= cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=' ') # 这个进度条有意思,print结尾不是 不会马上输出缓存区内容 sys.stdout.flush() # 刷新缓存区,及时显示。 def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) save_flag(image, cc.lower() + '.gif') return len(cc_list) def main(download_many): t0 = time.time() count = download_many(POP20_CC) elapse = time.time() - t0 msg = ' {} flags downloader in {:.2f}s' print(msg.format(count, elapse)) if __name__ == '__main__': main(download_many)
代码比较简单,我做标记的地方也不多。
17.1.2 使用concurrent.futures模块下载
from concurrent import futures from flags import save_flag, get_flag, show, main MAX_WORKERS = 20 def download_one(cc): img = get_flag(cc) show(cc) save_flag(img, cc.lower() + '.gif') return cc def download_many(cc_list): workers = min(MAX_WORKERS, len(cc_list)) # 确定线程数量 with futures.ThreadPoolExecutor(workers) as executor: # 使用工作的线程数实例化ThreadPoolExecutor类,executor.__exit__方法会调用executor.shutdown(wait=True)方法,它会在所有线程都执行完毕前阻塞线程 res = executor.map(download_one, cc_list) # 多线程执行download_one函数,参数从cc_list取 # executor.map会在多个线程中并发调用download_one return len(list(res)) if __name__ == '__main__': main(download_many)
17.1.3future在哪里
concurrent.futures.Future和asyncio.Future。
这两个类的相同:两个Future类的实例都可以标识可能已经完成或者尚未完成的延迟计算。
future封装待完成的操作,可以放入队列,完成的状态可以查寻,得到结果(或抛出异常)后可以获取结果(或异常)
通常情况下自己不应该创建future,而只能由并发框架(concurrent.futures或asyncio)实例化。
因为future表示终将发生的事情,而确定某件事件会发生的唯一方式是执行的事件已经排定。
只有把某件事交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。
比如Executor.submit()方法的参数时一个可调用对象,调用这个方法后会为传入的可调用对象进行排期,并返回一个future
这些futures有.done()方法,表示future链接的可调用对象是否已经执行。
.add_done_callback(),这个一个回调,future运行结束后会调用执行的可调用对象。
.result方法。返回future运行结束后调用对象的结果。
concurrent.futures.as_completed函数的参数是一个future列表,返回值时一个迭代器,在future运行结果后产出future
from concurrent import futures from flags import save_flag, get_flag, show, main MAX_WORKERS = 20 def download_one(cc): img = get_flag(cc) show(cc) save_flag(img, cc.lower() + '.gif') return cc def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] for cc in cc_list: future = executor.submit(download_one, cc) # 排定可调用对象的执行时间,然后返回一个future,表示这个待执行的操作 to_do.append(future) msg = 'Scheduled for {}: {}' print(msg.format(cc, future)) results = [] for future in futures.as_completed(to_do): # 在future运行结束后产除future res = future.result() msg = '{} result :{!r}' print(msg.format(future, res)) results.append(res) return len(list(results)) if __name__ == '__main__': main(download_many)
/usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十七章/flags_threadpool_ac.py Scheduled for CN: <Future at 0x100bb83d0 state=running> Scheduled for IN: <Future at 0x1014ee510 state=running> Scheduled for US: <Future at 0x1014eedd0 state=running> Scheduled for ID: <Future at 0x1014eed90 state=pending> Scheduled for BR: <Future at 0x1014fb710 state=pending> CN <Future at 0x100bb83d0 state=finished returned str> result :'CN' IN <Future at 0x1014ee510 state=finished returned str> result :'IN' ID <Future at 0x1014eed90 state=finished returned str> result :'ID' BR <Future at 0x1014fb710 state=finished returned str> result :'BR' US <Future at 0x1014eedd0 state=finished returned str> result :'US' 5 flags downloader in 4.42s Process finished with exit code 0
从显示器可以看出来,future的状态,由于只开了三个线程,有2个在等待操作。
17.2阻塞型I/O和GIL
Python标准库中的所有阻塞型I/O函数都会释放GIL,允许其他线程运行。
17.3 使用concurrent.futures模块启动进程。
直接把furures.ThreadPollExecutor换成futures.ProcessPollExecutor
就可以换成多进程操作。
后面的number参数时可选的,默认值是os.cpu_count()函数返回的CPU数量。
第一个时RC4加密算法,我自己看了一会儿加密逻辑,最后还时失败了,基础太差了,有空再好好研究加密的逻辑。
def arcfour(key, in_bytes, loops=20): kbox = bytearray(256) # 创建储存键的数组 for i, car in enumerate(key): # 复制键和向量 kbox[i] = car j = len(key) for i in range(j, 256): # 将kbox填满key,重复到底 kbox[i] = kbox[i-j] # 初始化sbox sbox = bytearray(range(256)) j = 0 for k in range(loops): for i in range(256): j = (j + sbox[i] + kbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # 主循环 i = 0 j = 0 out_buyes = bytearray() for car in in_bytes: i = (i + 1) % 256 # 打乱sbox j = (j + sbox[i]) % 256 sbox[i], sbox[j] = sbox[j], sbox[i] # 计算t t = (sbox[i] + sbox[j]) % 256 k = sbox[t] car = car ^ k out_buyes.append(car) return out_buyes def test(): from time import time clear = bytearray(b'1234567890' * 100000) t0 = time() cipher = arcfour(b'key', clear) # print(cipher) print(f'elapsed time: {time()-t0:.2f}s') result = arcfour(b'key', cipher) assert result == clear, '%r != %r' % (result, cipher) print('elapsed time: %.2fs' % (time() - t0)) print('ok') if __name__ == '__main__': test()
import sys import time from concurrent import futures from random import randrange from arcfour import arcfour JOBS = 16 # 工作任务数量 SIZE = 2 ** 18 # 初始字符字节长度的定义参数 KEY = b'Twas brilling,and the slity toves Dig gyre' STATUS = '{} workers, elapsed time: {:.2f}s' def arcfour_tset(size, key): ''' :param size: 输入可变字符字节的长度 :param key: 加密的key :return: ''' in_test = bytearray(randrange(256) for i in range(size)) # 输入字符 cypher_text = arcfour(key, in_test) # 加密 out_text = arcfour(key, cypher_text) # 解密 assert in_test == out_text, 'Failed arcfour_test' return size def main(workers=None): if workers: workers = int(workers) t0 = time.time() with futures.ProcessPoolExecutor(workers) as executor: actual_workers = executor._max_workers to_do = [] for i in range(JOBS, 0, -1): # 12个任务 size = SIZE + int(SIZE / JOBS * (i - JOBS/2)) job = executor.submit(arcfour_tset, size, KEY) # 提交future对象 to_do.append(job) for future in futures.as_completed(to_do): # 对生成结果的future取值 res = future.result() print('{:.1f} KB'.format(res / 2**10)) print(STATUS.format(actual_workers, time.time() - t0)) if __name__ == '__main__': if len(sys.argv) == 2: workers = int(sys.argv[1]) else: workers = None main(workers)
shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 1 384.0 KB 368.0 KB 352.0 KB 336.0 KB 320.0 KB 304.0 KB 288.0 KB 272.0 KB 256.0 KB 240.0 KB 224.0 KB 208.0 KB 192.0 KB 176.0 KB 160.0 KB 144.0 KB 1 workers, elapsed time: 7.09s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 2 368.0 KB 384.0 KB 352.0 KB 336.0 KB 304.0 KB 320.0 KB 272.0 KB 288.0 KB 240.0 KB 256.0 KB 208.0 KB 224.0 KB 176.0 KB 192.0 KB 144.0 KB 160.0 KB 2 workers, elapsed time: 3.65s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 3 352.0 KB 368.0 KB 384.0 KB 320.0 KB 336.0 KB 304.0 KB 256.0 KB 272.0 KB 288.0 KB 208.0 KB 224.0 KB 240.0 KB 160.0 KB 176.0 KB 192.0 KB 144.0 KB 3 workers, elapsed time: 2.62s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 4 336.0 KB 352.0 KB 368.0 KB 384.0 KB 320.0 KB 288.0 KB 272.0 KB 304.0 KB 208.0 KB 224.0 KB 240.0 KB 256.0 KB 144.0 KB 160.0 KB 176.0 KB 192.0 KB 4 workers, elapsed time: 1.91s
这个是从1进程到4进程运行后的时间差。
还有一个hashlib加密的多进程运算,先上代码:
import sys import time import hashlib from concurrent import futures from random import randrange JOBS = 12 SIZE = 2 ** 20 STATUS = '{} workers, elapsed time: {:.2f}s' def sha(size): # hash加密 data = bytearray(randrange(256) for i in range(size)) algo = hashlib.new('sha256') algo.update(data) return algo.hexdigest() def main(workers=None): if workers: workers = int(workers) t0 = time.time() with futures.ProcessPoolExecutor(workers) as executor: actual_workers = executor._max_workers # 用生成器创建future单元 to_do = (executor.submit(sha, SIZE) for i in range(JOBS)) for future in futures.as_completed(to_do): res = future.result() print(res) print(STATUS.format(actual_workers, time.time() - t0)) if __name__ == '__main__': if len(sys.argv) == 2: workers = int(sys.argv[1]) else: workers = None main(workers)
shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 1 475a1d4fd737010226ea9e4d73ed75ab8a906b82df1c377ad5c1f66e278747a3 9745a7e932967de7fdf3dd6f5200458ccd25f071ee45ead287352ba7a72cbb19 1369f4b05dc9c35f9f5f3c2b10205a74a8e1cdcf03863f5d74f73e9fc08bfbde edf636a83fa3aaeff05beee676462f7fc6fcd1adaf8db9b7104c58d959ec2360 0162055b97546bd4bd118603c360f00bf64c648f6ecef961d071dbc914ff3486 a6e0bce50e15314ffd1cb9a9fa048179900ebaf83e1a245f47c65ef2eb8142fa 4ba5dba64e8fcfb3b28127ed98857f3a587d0f068d1a46a6d1addf7c42c3bbc5 6e21affb7bf904b5c01b48d44c040736a2164cc005c993caa139e778d537eeaf 9ea008a211765adeea9fceb19989b68ca555b37ba9e192dcd95e843bb51c9b96 920fc411a60409c7181772c63571a908268e320f40bf3490d7f493c5b0fd8768 bb42c207a2185eff0dcaaf8146f3c15a6d6ca9b4301ad655307a2732b3674065 dcb913542cf4d8e790cfd98d1034204041062e44c25a2c2894f909473de85034 1 workers, elapsed time: 10.09s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 2 a98bf17ebd32bb9728db9f435f5974df54c8454bdae78c1033d86cefaba7bc10 4de6c981ee221e0faf3ca84130c3872f8cf022e931b91b7844572af53fb82b46 19db9433a51dbfe6ede7eea2cb774ae15a3310832507bf671b83c9e745bad674 5b80e296355d48aecf0c8449cfc2309a12eee91314cb86860de4d2429d388078 b6ebbf7e3d3760824decb617755f5ec40a56dd3ebf7176d0b909f5d52cc786e8 93bd269cc65cac2c8d44bee1019b01c4b8ba0449a20fea354073e9d511610e3e f7daaa89e103e89ad78ccb53c825412885525da38ca72f1abb39d872ae9b2286 992af055445863cad9f53486820550b8b05839b25b674540403194e2055181cb ce9b66642b36737b386d1678c8b06e94baf5f56e00c6fbd82855fc55fa92dc33 375a9e2b73de73596d826216f1e96328df9fc0ece02e57a73f6a84d5e2d59212 883b135e776c2c4d8fa3ba992e2aee928ddbddda03808aa57647b35f666e924e d62eac75c424f60fae7a6fb8addf1ef9598d90e01339324e99df956624fd562b 2 workers, elapsed time: 5.16s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 3 32b70dc055c60b6bf7b19e1b0cf1f64baa889639b0b244ce828a4de1f6c10d27 ef38e5ba26bf9843a57c6cc7f21a757cab2b9f4a12ade96ae47c95226a23804d f92d00fe9a9dd4ef2efd9b24cd183acffb91c8a9c02af16044229e4ddde3c105 21c10d098349480bb674cdac2c19937585e7306d4ae0011c03b29f12d7b4adcf 3f3f50bf41c9b6bc6dc2eda0cee9aefcf34505247ac4b02e684f6a804ecef122 c5d3e46f2d46dba809033b6919a140c5fe879e15246ef6990427903131c579e2 34353994b2907965df0d1173bc5c38c8a9ed0e39048ce14c5758e103d405810e 0702dba34f72e73d386e7220c8ed622571107dd8e85c8d529854d9fd28f774c6 fd452d84129d26b4eb2ad9b11d6a98edbe5dec944ee7827951924b33b0ac9a16 820a0b3a235c96a63309004e69ce078af001c15da16bcc2146acc1913a2042fb 218ec4fc91662a7114589b2aebbd2ded5a9feb823f64aed5d711847910920b74 fabd9a9956d9c337fc138be8e28981bb0db7cb68ca8245aedb805d628b8cec2e 3 workers, elapsed time: 3.56s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 4 8db14be1a4ad447fc8d11b3aaee7962af80f403655037af1d0c199333823952f f091566644043878e3eb5ca3e2587f8f23ba606bc415f4706cf2148ba984e69f d8f34e26df1e243aed8dcd45a619fead75b72232a8ba55d1d1e6af00a25c49c8 62c8466456cbbde27b8f27e8c14ea937867a148af4beb510ab298933a974f2f9 badb0006841959266c5701692f8facb85a0c453625542546d5b9ef97151803f4 ed82ff4f948c2110348ef1414b73a6a4843b9ed8c4e111276f1938c245dcc8ed 0b7dfe1fcb2ece452558fcf5bf15f518844d94cfbb2d055fbac65d07e41cdbd5 c7dbefdc44b6812bb3f9c8b2fce7c035ff28e255ed37723685e99b99d08a3bb4 9534570620a5d0dfc73f0ca08d8f371afa17b8de64cdd1758a2eb34fde8380a4 7f31bab5b6ea8bf2645c85a987808caf6b66887798f27a2954726baf4b558e5c 0104677cdd7a2f91862d6a0dfe71e95d835b0995b540e56878abf2248f9472ee 3970f01fb6cecbfe67de8a699abc578e99162ba9a9864459d296d7ea1a57ebeb 4 workers, elapsed time: 2.72s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 8 150755678a6c05034957fc2faa82b813d449ef057bddd8b1781bccd8f87f7c23 5a15fefd54fb16c5e77c05957d7892d66dfbba50f0538f6c96c314a949c82479 77492540359fe375384970f80a03b1b18d37c76601d5676b469f0a1e600ec153 d4ae793642e5394c32ed9284b7e0ab2c0c5347e35201218ed2b98d4c95d6a888 f5ddbe2d8613f577a209018e065bc568def5bfc81a9203dd17b8ebba76411fcf d72549a41193ff6cccefaffb224b75f92963ee756311939befadfbb1ea4d2392 9558e385d6389b9a56bd32e833a4a8f20f3afb85d88611dc9c6c683b237645f5 0d8f6392a77dd3b53f13770ef09770a04fca59a615a18c44c023898c33890ed8 b65337796496c283928272e0aca2967466a908a034f30155241daa54e387951d 9b2131a927e1a7dec1b620203e1bea6d3f66a27d51191c802d69cb0831a0b3dc a7c3e77d6af61a48076c1dd55fc83cde2c41f0886818c39892c811c5857af4b6 31129446b33f24657d3b49e1a8b3313d1d1f44ddd8b0dec8f3c8498521b4d4fa 8 workers, elapsed time: 1.91s shijianzhongdeMacBook-Pro:第十七章 shijianzhong$
从运行结果来看,开多进程的运算速度提升很快。
17.4 试验Executor.map方法
先上代码:
from time import sleep, strftime from concurrent import futures # 显示时间函数 def display(*args): print(strftime('[%H:%M:%S]'), end=' ') print(*args) def loiter(n): msg = '{}loiter({}): doing nothing for {}s...' display(msg.format(' '*n,n,n)) sleep(n) msg = '{}loiter({}): done' display(msg.format(' '*n, n)) return n * 10 def main(): display('Script starting') # 开三个线程 executor = futures.ThreadPoolExecutor(max_workers=3) # 传入0-5,5个参数 results = executor.map(loiter, range(5)) # 显示结果 display('results', results) display('Waiting for individual results:') for i, result in enumerate(results): display('result {}: {}'.format(i, result)) main()
/usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十七章/demo_executor_map.py [02:00:50] Script starting [02:00:50] loiter(0): doing nothing for 0s... [02:00:50] loiter(0): done [02:00:50] loiter(1): doing nothing for 1s... [02:00:50] loiter(2): doing nothing for 2s... [02:00:50] results <generator object Executor.map.<locals>.result_iterator at 0x103340ed0> [02:00:50] Waiting for individual results: [02:00:50] result 0: 0 [02:00:50] loiter(3): doing nothing for 3s... [02:00:51] loiter(1): done [02:00:51] loiter(4): doing nothing for 4s... [02:00:51] result 1: 10 [02:00:52] loiter(2): done [02:00:52] result 2: 20 [02:00:53] loiter(3): done [02:00:53] result 3: 30 [02:00:55] loiter(4): done [02:00:55] result 4: 40 Process finished with exit code 0
这个一个开了三个线程,跑了5个任务,futures.map后的result按照传参的顺序出来。
如果第一个参数的调用时间比较长,那后面的结果就算已经运行完毕,也将发生阻塞,只能等第一个参数的结果出来,才能执行后面的。
可以将上面的参数range(5)的顺序进行调换。
因此executor.submit和futures.as_completed这个组合用起来更好,
executor.submit能够处理不同的可调用对象和参数,futures.map只能处理参数不同的同一个可调用对象
futures.as_completed函数的future集合可以来自多个Executor实例,列如一些由ThreadPoolExecutor实例创建,另一些由ProcessPoolExecutor实例创建。
17.5显示下载进度并处理错误:
基础文件:
import os import time import sys import string import argparse from collections import namedtuple from enum import Enum Result = namedtuple('Result', 'status data') #初始三种下载状态,OK,404,其他错误 HTTPStatus = Enum('Status', 'ok not_found error') POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() DEFAULT_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1 SERVERS = { 'REMOTE': 'http://flupy.org/data/flags', 'LOCAL': 'http://localhost:8001/flags', 'DELAY': 'http://localhost:8002/flags', 'ERROR': 'http://localhost:8003/flags', } DEFAULT_SERVER = 'LOCAL' DEST_DIR = 'downloads/' COUNTRY_CODES_FILE = 'country_codes.txt' def save_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def initial_report(cc_list, actual_req, server_label): # 下载前的报告 if len(cc_list) <= 10: cc_msg = ', '.join(cc_list) else: cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1]) # 哪个服务器开始下载 print('{} site: {}'.format(server_label, SERVERS[server_label])) msg = 'Searching for {} flag{}: {}' plural = 's' if len(cc_list) !=1 else '' # 输出下载的国家情况 print(msg.format(len(cc_list), plural, cc_msg)) msg = '{} concurrent connection{} will be used.' # 线程下载数量 print(msg.format(actual_req, plural)) def final_report(cc_list, counter, start_time): # 下载完成后的总结报告 elapsed = time.time() - start_time print('-' * 20) msg = '{} flag{} downloaded' plural = 's' if counter[HTTPStatus.ok] != 1 else '' print(msg.format(counter[HTTPStatus.ok], plural)) if counter[HTTPStatus.not_found]: print(counter[HTTPStatus.not_found], 'not found.') if counter[HTTPStatus.error]: plural = 'f' if counter[HTTPStatus.error] != 1 else '' print('{} error {}.'.format(counter[HTTPStatus.error], plural)) print('Elapsed time: {:.2f}'.format(elapsed)) def expand_cc__args(every_cc, all_cc, cc_args, limit): # 通过集合来过滤输入的重复参数 codes = set() A_Z = string.ascii_uppercase # 就是A_Z的大写字母字符串 # 当有-e参数时,包含了所有的az两个数字的组合 if every_cc: codes.update(a+b for a in A_Z for b in A_Z) # 当 -a读取文件 elif all_cc: with open(COUNTRY_CODES_FILE) as fp: text = fp.read() codes.update(text.split()) # 当有位置参数的时候,位置参数为输入 else: for cc in (c.upper() for c in cc_args): if len(cc) == 1 and cc in A_Z: codes.update(cc+c for c in A_Z) elif len(cc) == 2 and all(c in A_Z for c in cc): codes.add(cc) else: msg = 'rech CC argument must be A ot Z or AA to ZZ' raise ValueError('*** Usage error: ' + msg) # 返回数据列表 return sorted(codes)[:limit] def process_args(default_concur_req): server_options = ', '.join(sorted(SERVERS)) parser = argparse.ArgumentParser( description='Download flags fir country codes. ' 'Default: top 20 countries by population') # cc为位置参数,nargs='*'表示可以接收0到任意数量的位置参数数据 parser.add_argument('cc', metavar='CC', nargs='*', help='country code or 1st letter (eg. B for BA...BZ)') parser.add_argument('-a', '--all', action='store_true', help='get all available flags (AD to ZW)') parser.add_argument('-e', '--every', action='store_true', help='get flags for every possible code (AA...ZZ)') parser.add_argument('-l','--limit', metavar='N', type=int, help='limit to N first codes', default=sys.maxsize) parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int, default=default_concur_req, help='maximum concurrent requests (default={})' .format(default_concur_req)) parser.add_argument('-s', '--server', metavar='LABEL', default=DEFAULT_SERVER, help='Server to hit; one of {} (default={})' .format(frozenset, DEFAULT_SERVER)) parser.add_argument('-v', '--verbose', action='store_true', help='output detailed progress info') args = parser.parse_args() if args.max_req < 1: print('***Usage error: --max_req CONCURRENT must be >= 1') parser.print_usage() sys.exit(1) if args.limit < 1: print('*** Usage error: --limit N must be >= 1') parser.print_usage() sys.exit(1) args.server = args.server.upper() if args.server not in SERVERS: print('*** Usage error: --server LABEL must be one of', server_options) parser.print_usage() sys.exit(1) try: # args.every,args.all,args.cc默认不输入前面两个为false,后面一个是空列表,limit限制默认是系统最大值 cc_list = expand_cc__args(args.every, args.all, args.cc, args.limit) except ValueError as exc: # 捕获expand_cc__args上报的错误 print(exc.args[0]) parser.print_usage() sys.exit(1) # 如果返回的是空列表,读取脚本中的定义列表 if not cc_list: cc_list = sorted(POP20_CC) # 符合条件返回args对象,激 return args, cc_list def main(download_many, default_concur_req, max_concur_req): # 返回args对象,以及待处理的国家名单 args, cc_list = process_args(default_concur_req) # args.max_req输入参数最大值,max_concur_req默认的最大值,处理国家列表的数值 # 求出最小值,避免开多余的线程浪费。 actual_req = min(args.max_req, max_concur_req, len(cc_list)) # 准备下载前面的情况说明。 initial_report(cc_list, actual_req, args.server) # 基础网址 base_url = SERVERS[args.server] t0 = time.time() # 执行程序 counter接收关系对象字典 counter = download_many(cc_list, base_url, args.verbose, actual_req) # 当接受到的状态与处理名单一致,就没错,说明每个国家都申请了。 assert sum(counter.values()) == len(cc_list), 'some down loads are unaccounted for' # 出总结报告 final_report(cc_list, counter, t0)
单线程下载文件:
import collections import requests import tqdm from flags2_common import main, save_flag, HTTPStatus, Result DEFAULT_CONCUR_REQ = 1 MAX_CONCUR_REQ = 1 def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: resp.raise_for_status() return resp.content def download_one(cc, base_url, verbose = False): try: image = get_flag(base_url, cc) except requests.exceptions.HTTPError as exc: res = exc.response # 只处理404说没找到,另外的错误代码继续上浮上去 if res.status_code == 404: # status为Enum关系对象 status = HTTPStatus.not_found msg = 'not find' else: raise else: save_flag(image, cc.lower() + '.gif') # status为Enum关系对象 status = HTTPStatus.ok msg = 'ok' # 显示清单 if verbose: print(cc, msg) # 返回Result对象,里面包含了一个status关系对象 return Result(status, cc) def download_many(cc_list, base_url, verbose, max_req): counter = collections.Counter() cc_iter = sorted(cc_list) # 没有输入-v,给一个下载条 if not verbose: cc_iter = tqdm.tqdm(cc_iter) # 给清单,没有下载条 for cc in cc_iter: try: res = download_one(cc, base_url, verbose) except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' # 正常跑下去,可以获取到ok与not_find(404)的关系对象 status = res.status if error_msg: # 如果有报错,获取到错误的关系对象 status = HTTPStatus.error counter[status] += 1 # 没有下载条,或者 if verbose and error_msg: print('*** Error for {}: {}'.format(cc, error_msg)) # 返回Enum关系对象的字典 return counter if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
多线程下载文件:
import collections from concurrent import futures import requests import tqdm from flags2_common import main, HTTPStatus from flags2_sequential import download_one # 默认线程数量 DEFAULT_CONCUR_REQ = 30 # 最大线程数量 MAX_CONCUR_REQ = 1000 def download_many(cc_list, base_url, verbose, concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: to_do_map = {} for cc in sorted(cc_list): future = executor.submit(download_one, cc, base_url, verbose) to_do_map[future] = cc done_iter = futures.as_completed(to_do_map) if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP error {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' # 正常跑下去,可以获取到ok与not_find(404)的关系对象 status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: cc = to_do_map[future] print('*** Error for {}: {}'.format(cc, error_msg)) return counter if __name__ == '__main__': main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
上面是抄袭书中的代码,实现单线程或多线程下载文件的过程,整个代码如行云流水,果然是大神的作品。
默认是通过本地的nigix代理下载,还有下载延迟与下载错误,我没有演示。
很厉害的通过argparse模块来处理默认参数与输入参数。
真的很厉害,参数对象的保存,下载反馈的情况保存用到了Enum。
膜拜ing