zoukankan      html  css  js  c++  java
  • 流畅的python,Fluent Python 第十七章笔记 (使用future处理并发)

    一般我在写笔记之前,我都会粗略的看一下本章节内容,根据我对future的使用,感觉它就向对threading与process的高度封装使用。

    这是一个模板化使用多线程,多进程的方式,不需要过多的去了解多线程,多进程使用中的细节。

    17.1.1依次下载的脚本

    下面是个单进程,单线程下载文件的文件的脚本,感觉逻辑写大非常好,大神就是大神。

    import os
    import time
    import sys
    
    import requests
    
    POP20_CC = ('CN IN US ID BR OK NG BD RU JP '
                'MX PH VN EG DE IR TR CD FR').split()
    
    BASE_URL = 'http://flupy.org/data/flags'
    
    DEST_DIR = 'downloads/'
    
    
    def save_flag(img, filename):
        path = os.path.join(DEST_DIR, filename)
        with open(path, 'wb') as fp:
            fp.write(img)
    
    def get_flag(cc):
        url = '{}/{cc}/{cc}'.format(BASE_URL, cc= cc.lower())
        resp = requests.get(url)
        return resp.content
    
    def show(text):
        print(text, end=' ')     # 这个进度条有意思,print结尾不是
    不会马上输出缓存区内容
        sys.stdout.flush()       # 刷新缓存区,及时显示。
    
    def download_many(cc_list):
        for cc in sorted(cc_list):
            image = get_flag(cc)
            show(cc)
            save_flag(image, cc.lower() + '.gif')
    
        return len(cc_list)
    
    def main(download_many):
        t0 = time.time()
        count = download_many(POP20_CC)
        elapse = time.time() - t0
        msg = '
    {} flags downloader in {:.2f}s'
        print(msg.format(count, elapse))
    
    if __name__ == '__main__':
        main(download_many)
    

     代码比较简单,我做标记的地方也不多。

    17.1.2 使用concurrent.futures模块下载

    from concurrent import futures
    
    from flags import save_flag, get_flag, show, main
    
    MAX_WORKERS = 20
    
    
    def download_one(cc):
        img = get_flag(cc)
        show(cc)
        save_flag(img, cc.lower() + '.gif')
        return cc
    
    def download_many(cc_list):
        workers = min(MAX_WORKERS, len(cc_list))   # 确定线程数量
        with futures.ThreadPoolExecutor(workers) as executor:
            # 使用工作的线程数实例化ThreadPoolExecutor类,executor.__exit__方法会调用executor.shutdown(wait=True)方法,它会在所有线程都执行完毕前阻塞线程
            res = executor.map(download_one, cc_list) # 多线程执行download_one函数,参数从cc_list取
            # executor.map会在多个线程中并发调用download_one
        return len(list(res))
    
    if __name__ == '__main__':
        main(download_many)
    

    17.1.3future在哪里

    concurrent.futures.Future和asyncio.Future。

    这两个类的相同:两个Future类的实例都可以标识可能已经完成或者尚未完成的延迟计算。

    future封装待完成的操作,可以放入队列,完成的状态可以查寻,得到结果(或抛出异常)后可以获取结果(或异常)

    通常情况下自己不应该创建future,而只能由并发框架(concurrent.futures或asyncio)实例化。

    因为future表示终将发生的事情,而确定某件事件会发生的唯一方式是执行的事件已经排定。

    只有把某件事交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。

    比如Executor.submit()方法的参数时一个可调用对象,调用这个方法后会为传入的可调用对象进行排期,并返回一个future

    这些futures有.done()方法,表示future链接的可调用对象是否已经执行。

    .add_done_callback(),这个一个回调,future运行结束后会调用执行的可调用对象。

    .result方法。返回future运行结束后调用对象的结果。

    concurrent.futures.as_completed函数的参数是一个future列表,返回值时一个迭代器,在future运行结果后产出future

    from concurrent import futures
    
    from flags import save_flag, get_flag, show, main
    
    MAX_WORKERS = 20
    
    
    def download_one(cc):
        img = get_flag(cc)
        show(cc)
        save_flag(img, cc.lower() + '.gif')
        return cc
    
    def download_many(cc_list):
        cc_list = cc_list[:5]
        with futures.ThreadPoolExecutor(max_workers=3) as executor:
            to_do = []
            for cc in cc_list:
                future = executor.submit(download_one, cc)  # 排定可调用对象的执行时间,然后返回一个future,表示这个待执行的操作
                to_do.append(future)
                msg = 'Scheduled for {}: {}'
                print(msg.format(cc, future))
            results = []
            for future in futures.as_completed(to_do):  # 在future运行结束后产除future
                res = future.result()
                msg = '{} result :{!r}'
                print(msg.format(future, res))
                results.append(res)
        return len(list(results))
    
    if __name__ == '__main__':
        main(download_many)
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十七章/flags_threadpool_ac.py
    Scheduled for CN: <Future at 0x100bb83d0 state=running>
    Scheduled for IN: <Future at 0x1014ee510 state=running>
    Scheduled for US: <Future at 0x1014eedd0 state=running>
    Scheduled for ID: <Future at 0x1014eed90 state=pending>
    Scheduled for BR: <Future at 0x1014fb710 state=pending>
    CN <Future at 0x100bb83d0 state=finished returned str> result :'CN'
    IN <Future at 0x1014ee510 state=finished returned str> result :'IN'
    ID <Future at 0x1014eed90 state=finished returned str> result :'ID'
    BR <Future at 0x1014fb710 state=finished returned str> result :'BR'
    US <Future at 0x1014eedd0 state=finished returned str> result :'US'
    
    5 flags downloader in 4.42s
    
    Process finished with exit code 0
    

     从显示器可以看出来,future的状态,由于只开了三个线程,有2个在等待操作。

    17.2阻塞型I/O和GIL

    Python标准库中的所有阻塞型I/O函数都会释放GIL,允许其他线程运行。

    17.3 使用concurrent.futures模块启动进程。

    直接把furures.ThreadPollExecutor换成futures.ProcessPollExecutor

    就可以换成多进程操作。

    后面的number参数时可选的,默认值是os.cpu_count()函数返回的CPU数量。

     第一个时RC4加密算法,我自己看了一会儿加密逻辑,最后还时失败了,基础太差了,有空再好好研究加密的逻辑。

    def arcfour(key, in_bytes, loops=20):
    
        kbox = bytearray(256)   # 创建储存键的数组
        for i, car in enumerate(key):   # 复制键和向量
            kbox[i] = car
        j = len(key)
        for i in range(j, 256):     # 将kbox填满key,重复到底
            kbox[i] = kbox[i-j]
    
        # 初始化sbox
        sbox  = bytearray(range(256))
    
        j = 0
        for k in range(loops):
            for i in range(256):
                j = (j + sbox[i] + kbox[i]) % 256
                sbox[i], sbox[j] = sbox[j], sbox[i]
    
        # 主循环
        i = 0
        j = 0
        out_buyes = bytearray()
    
        for car in in_bytes:
            i = (i + 1) % 256
            # 打乱sbox
            j = (j + sbox[i]) % 256
            sbox[i], sbox[j] = sbox[j], sbox[i]
            # 计算t
            t = (sbox[i] + sbox[j]) % 256
            k = sbox[t]
            car = car ^ k
            out_buyes.append(car)
    
        return out_buyes
    
    
    def test():
        from time import time
        clear = bytearray(b'1234567890' * 100000)
        t0 = time()
        cipher = arcfour(b'key', clear)
        # print(cipher)
        print(f'elapsed time: {time()-t0:.2f}s')
        result = arcfour(b'key', cipher)
        assert result == clear, '%r != %r' % (result, cipher)
        print('elapsed time: %.2fs' % (time() - t0))
        print('ok')
    
    
    if __name__ == '__main__':
        test()
    
    import sys
    import time
    from concurrent import futures
    from random import randrange
    from arcfour import arcfour
    
    
    
    JOBS = 16     # 工作任务数量
    SIZE = 2 ** 18    # 初始字符字节长度的定义参数
    
    
    KEY = b'Twas brilling,and the slity toves
    Dig gyre'
    STATUS = '{} workers, elapsed time: {:.2f}s'
    
    
    def arcfour_tset(size, key):
        '''
        :param size: 输入可变字符字节的长度
        :param key:  加密的key
        :return:
        '''
        in_test = bytearray(randrange(256) for i in range(size))   # 输入字符
        cypher_text = arcfour(key, in_test)        # 加密
        out_text = arcfour(key, cypher_text)       # 解密
        assert in_test == out_text, 'Failed arcfour_test'
        return size
    
    
    def main(workers=None):
        if workers:
            workers = int(workers)
        t0 = time.time()
    
        with futures.ProcessPoolExecutor(workers) as executor:
            actual_workers = executor._max_workers
            to_do = []
            for i in range(JOBS, 0, -1):       # 12个任务
                size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
                job = executor.submit(arcfour_tset, size, KEY)    # 提交future对象
                to_do.append(job)
    
            for future in futures.as_completed(to_do):  # 对生成结果的future取值
                res = future.result()
                print('{:.1f} KB'.format(res / 2**10))
    
            print(STATUS.format(actual_workers, time.time() - t0))
    
    if __name__ == '__main__':
        if len(sys.argv) == 2:
            workers = int(sys.argv[1])
        else:
            workers = None
        main(workers)
    
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 1
    384.0 KB
    368.0 KB
    352.0 KB
    336.0 KB
    320.0 KB
    304.0 KB
    288.0 KB
    272.0 KB
    256.0 KB
    240.0 KB
    224.0 KB
    208.0 KB
    192.0 KB
    176.0 KB
    160.0 KB
    144.0 KB
    1 workers, elapsed time: 7.09s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 2
    368.0 KB
    384.0 KB
    352.0 KB
    336.0 KB
    304.0 KB
    320.0 KB
    272.0 KB
    288.0 KB
    240.0 KB
    256.0 KB
    208.0 KB
    224.0 KB
    176.0 KB
    192.0 KB
    144.0 KB
    160.0 KB
    2 workers, elapsed time: 3.65s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 3
    352.0 KB
    368.0 KB
    384.0 KB
    320.0 KB
    336.0 KB
    304.0 KB
    256.0 KB
    272.0 KB
    288.0 KB
    208.0 KB
    224.0 KB
    240.0 KB
    160.0 KB
    176.0 KB
    192.0 KB
    144.0 KB
    3 workers, elapsed time: 2.62s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 arcfour_futures.py 4
    336.0 KB
    352.0 KB
    368.0 KB
    384.0 KB
    320.0 KB
    288.0 KB
    272.0 KB
    304.0 KB
    208.0 KB
    224.0 KB
    240.0 KB
    256.0 KB
    144.0 KB
    160.0 KB
    176.0 KB
    192.0 KB
    4 workers, elapsed time: 1.91s
    

     这个是从1进程到4进程运行后的时间差。

    还有一个hashlib加密的多进程运算,先上代码:

    import sys
    import time
    import hashlib
    from concurrent import  futures
    from random import randrange
    
    
    JOBS = 12
    SIZE = 2 ** 20
    STATUS = '{} workers, elapsed time: {:.2f}s'
    
    
    def sha(size):
        # hash加密
        data = bytearray(randrange(256) for i in range(size))
        algo = hashlib.new('sha256')
        algo.update(data)
        return algo.hexdigest()
    
    
    def main(workers=None):
        if workers:
            workers = int(workers)
        t0 = time.time()
    
        with futures.ProcessPoolExecutor(workers) as executor:
            actual_workers = executor._max_workers
            # 用生成器创建future单元
            to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
            for future in futures.as_completed(to_do):
                res = future.result()
                print(res)
    
        print(STATUS.format(actual_workers, time.time() - t0))
    
    
    if __name__ == '__main__':
        if len(sys.argv) == 2:
            workers = int(sys.argv[1])
        else:
            workers = None
        main(workers)
    
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 1
    475a1d4fd737010226ea9e4d73ed75ab8a906b82df1c377ad5c1f66e278747a3
    9745a7e932967de7fdf3dd6f5200458ccd25f071ee45ead287352ba7a72cbb19
    1369f4b05dc9c35f9f5f3c2b10205a74a8e1cdcf03863f5d74f73e9fc08bfbde
    edf636a83fa3aaeff05beee676462f7fc6fcd1adaf8db9b7104c58d959ec2360
    0162055b97546bd4bd118603c360f00bf64c648f6ecef961d071dbc914ff3486
    a6e0bce50e15314ffd1cb9a9fa048179900ebaf83e1a245f47c65ef2eb8142fa
    4ba5dba64e8fcfb3b28127ed98857f3a587d0f068d1a46a6d1addf7c42c3bbc5
    6e21affb7bf904b5c01b48d44c040736a2164cc005c993caa139e778d537eeaf
    9ea008a211765adeea9fceb19989b68ca555b37ba9e192dcd95e843bb51c9b96
    920fc411a60409c7181772c63571a908268e320f40bf3490d7f493c5b0fd8768
    bb42c207a2185eff0dcaaf8146f3c15a6d6ca9b4301ad655307a2732b3674065
    dcb913542cf4d8e790cfd98d1034204041062e44c25a2c2894f909473de85034
    1 workers, elapsed time: 10.09s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 2
    a98bf17ebd32bb9728db9f435f5974df54c8454bdae78c1033d86cefaba7bc10
    4de6c981ee221e0faf3ca84130c3872f8cf022e931b91b7844572af53fb82b46
    19db9433a51dbfe6ede7eea2cb774ae15a3310832507bf671b83c9e745bad674
    5b80e296355d48aecf0c8449cfc2309a12eee91314cb86860de4d2429d388078
    b6ebbf7e3d3760824decb617755f5ec40a56dd3ebf7176d0b909f5d52cc786e8
    93bd269cc65cac2c8d44bee1019b01c4b8ba0449a20fea354073e9d511610e3e
    f7daaa89e103e89ad78ccb53c825412885525da38ca72f1abb39d872ae9b2286
    992af055445863cad9f53486820550b8b05839b25b674540403194e2055181cb
    ce9b66642b36737b386d1678c8b06e94baf5f56e00c6fbd82855fc55fa92dc33
    375a9e2b73de73596d826216f1e96328df9fc0ece02e57a73f6a84d5e2d59212
    883b135e776c2c4d8fa3ba992e2aee928ddbddda03808aa57647b35f666e924e
    d62eac75c424f60fae7a6fb8addf1ef9598d90e01339324e99df956624fd562b
    2 workers, elapsed time: 5.16s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 3
    32b70dc055c60b6bf7b19e1b0cf1f64baa889639b0b244ce828a4de1f6c10d27
    ef38e5ba26bf9843a57c6cc7f21a757cab2b9f4a12ade96ae47c95226a23804d
    f92d00fe9a9dd4ef2efd9b24cd183acffb91c8a9c02af16044229e4ddde3c105
    21c10d098349480bb674cdac2c19937585e7306d4ae0011c03b29f12d7b4adcf
    3f3f50bf41c9b6bc6dc2eda0cee9aefcf34505247ac4b02e684f6a804ecef122
    c5d3e46f2d46dba809033b6919a140c5fe879e15246ef6990427903131c579e2
    34353994b2907965df0d1173bc5c38c8a9ed0e39048ce14c5758e103d405810e
    0702dba34f72e73d386e7220c8ed622571107dd8e85c8d529854d9fd28f774c6
    fd452d84129d26b4eb2ad9b11d6a98edbe5dec944ee7827951924b33b0ac9a16
    820a0b3a235c96a63309004e69ce078af001c15da16bcc2146acc1913a2042fb
    218ec4fc91662a7114589b2aebbd2ded5a9feb823f64aed5d711847910920b74
    fabd9a9956d9c337fc138be8e28981bb0db7cb68ca8245aedb805d628b8cec2e
    3 workers, elapsed time: 3.56s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 4
    8db14be1a4ad447fc8d11b3aaee7962af80f403655037af1d0c199333823952f
    f091566644043878e3eb5ca3e2587f8f23ba606bc415f4706cf2148ba984e69f
    d8f34e26df1e243aed8dcd45a619fead75b72232a8ba55d1d1e6af00a25c49c8
    62c8466456cbbde27b8f27e8c14ea937867a148af4beb510ab298933a974f2f9
    badb0006841959266c5701692f8facb85a0c453625542546d5b9ef97151803f4
    ed82ff4f948c2110348ef1414b73a6a4843b9ed8c4e111276f1938c245dcc8ed
    0b7dfe1fcb2ece452558fcf5bf15f518844d94cfbb2d055fbac65d07e41cdbd5
    c7dbefdc44b6812bb3f9c8b2fce7c035ff28e255ed37723685e99b99d08a3bb4
    9534570620a5d0dfc73f0ca08d8f371afa17b8de64cdd1758a2eb34fde8380a4
    7f31bab5b6ea8bf2645c85a987808caf6b66887798f27a2954726baf4b558e5c
    0104677cdd7a2f91862d6a0dfe71e95d835b0995b540e56878abf2248f9472ee
    3970f01fb6cecbfe67de8a699abc578e99162ba9a9864459d296d7ea1a57ebeb
    4 workers, elapsed time: 2.72s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ python3 sha_futures.py 8
    150755678a6c05034957fc2faa82b813d449ef057bddd8b1781bccd8f87f7c23
    5a15fefd54fb16c5e77c05957d7892d66dfbba50f0538f6c96c314a949c82479
    77492540359fe375384970f80a03b1b18d37c76601d5676b469f0a1e600ec153
    d4ae793642e5394c32ed9284b7e0ab2c0c5347e35201218ed2b98d4c95d6a888
    f5ddbe2d8613f577a209018e065bc568def5bfc81a9203dd17b8ebba76411fcf
    d72549a41193ff6cccefaffb224b75f92963ee756311939befadfbb1ea4d2392
    9558e385d6389b9a56bd32e833a4a8f20f3afb85d88611dc9c6c683b237645f5
    0d8f6392a77dd3b53f13770ef09770a04fca59a615a18c44c023898c33890ed8
    b65337796496c283928272e0aca2967466a908a034f30155241daa54e387951d
    9b2131a927e1a7dec1b620203e1bea6d3f66a27d51191c802d69cb0831a0b3dc
    a7c3e77d6af61a48076c1dd55fc83cde2c41f0886818c39892c811c5857af4b6
    31129446b33f24657d3b49e1a8b3313d1d1f44ddd8b0dec8f3c8498521b4d4fa
    8 workers, elapsed time: 1.91s
    shijianzhongdeMacBook-Pro:第十七章 shijianzhong$ 
    

     从运行结果来看,开多进程的运算速度提升很快。

    17.4 试验Executor.map方法

    先上代码:

    from time import sleep, strftime
    from concurrent import futures
    
    # 显示时间函数
    def display(*args):
        print(strftime('[%H:%M:%S]'), end=' ')
        print(*args)
    
    
    def loiter(n):
        msg = '{}loiter({}): doing nothing for {}s...'
        display(msg.format('    '*n,n,n))
        sleep(n)
        msg = '{}loiter({}): done'
        display(msg.format('    '*n, n))
        return n * 10
    
    def main():
        display('Script starting')
        # 开三个线程
        executor = futures.ThreadPoolExecutor(max_workers=3)
        # 传入0-5,5个参数
        results = executor.map(loiter, range(5))
        # 显示结果
        display('results', results)
    
        display('Waiting for individual results:')
        for i, result in enumerate(results):
            display('result {}: {}'.format(i, result))
    
    
    main()
    
    /usr/local/bin/python3.7 /Users/shijianzhong/study/Fluent_Python/第十七章/demo_executor_map.py
    [02:00:50] Script starting
    [02:00:50] loiter(0): doing nothing for 0s...
    [02:00:50] loiter(0): done
    [02:00:50]     loiter(1): doing nothing for 1s...
    [02:00:50]         loiter(2): doing nothing for 2s...
    [02:00:50] results <generator object Executor.map.<locals>.result_iterator at 0x103340ed0>
    [02:00:50] Waiting for individual results:
    [02:00:50] result 0: 0
    [02:00:50]             loiter(3): doing nothing for 3s...
    [02:00:51]     loiter(1): done
    [02:00:51]                 loiter(4): doing nothing for 4s...
    [02:00:51] result 1: 10
    [02:00:52]         loiter(2): done
    [02:00:52] result 2: 20
    [02:00:53]             loiter(3): done
    [02:00:53] result 3: 30
    [02:00:55]                 loiter(4): done
    [02:00:55] result 4: 40
    
    Process finished with exit code 0
    

     这个一个开了三个线程,跑了5个任务,futures.map后的result按照传参的顺序出来。

    如果第一个参数的调用时间比较长,那后面的结果就算已经运行完毕,也将发生阻塞,只能等第一个参数的结果出来,才能执行后面的。

    可以将上面的参数range(5)的顺序进行调换。

    因此executor.submit和futures.as_completed这个组合用起来更好,

    executor.submit能够处理不同的可调用对象和参数,futures.map只能处理参数不同的同一个可调用对象

    futures.as_completed函数的future集合可以来自多个Executor实例,列如一些由ThreadPoolExecutor实例创建,另一些由ProcessPoolExecutor实例创建。

    17.5显示下载进度并处理错误:

    基础文件:

    import os
    import time
    import sys
    import string
    import argparse
    from collections import namedtuple
    from enum import Enum
    
    Result = namedtuple('Result', 'status data')
    
    #初始三种下载状态,OK,404,其他错误
    HTTPStatus = Enum('Status', 'ok not_found error')
    
    POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
                'MX PH VN ET EG DE IR TR CD FR').split()
    
    DEFAULT_CONCUR_REQ = 1
    MAX_CONCUR_REQ = 1
    
    SERVERS = {
        'REMOTE': 'http://flupy.org/data/flags',
        'LOCAL': 'http://localhost:8001/flags',
        'DELAY': 'http://localhost:8002/flags',
        'ERROR': 'http://localhost:8003/flags',
    }
    DEFAULT_SERVER = 'LOCAL'
    
    DEST_DIR = 'downloads/'
    COUNTRY_CODES_FILE = 'country_codes.txt'
    
    
    def save_flag(img, filename):
        path = os.path.join(DEST_DIR, filename)
        with open(path, 'wb') as fp:
            fp.write(img)
    
    
    def initial_report(cc_list, actual_req, server_label):
        # 下载前的报告
        if len(cc_list) <= 10:
            cc_msg = ', '.join(cc_list)
        else:
            cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
        # 哪个服务器开始下载
        print('{} site: {}'.format(server_label, SERVERS[server_label]))
        msg = 'Searching for {} flag{}: {}'
        plural = 's' if len(cc_list) !=1 else ''
        # 输出下载的国家情况
        print(msg.format(len(cc_list), plural, cc_msg))
        msg = '{} concurrent connection{} will be used.'
        # 线程下载数量
        print(msg.format(actual_req, plural))
    
    
    def final_report(cc_list, counter, start_time):
        # 下载完成后的总结报告
        elapsed = time.time() - start_time
        print('-' * 20)
        msg = '{} flag{} downloaded'
        plural = 's' if counter[HTTPStatus.ok] != 1 else ''
        print(msg.format(counter[HTTPStatus.ok], plural))
        if counter[HTTPStatus.not_found]:
            print(counter[HTTPStatus.not_found], 'not found.')
        if counter[HTTPStatus.error]:
            plural = 'f' if counter[HTTPStatus.error] != 1 else ''
            print('{} error {}.'.format(counter[HTTPStatus.error], plural))
        print('Elapsed time: {:.2f}'.format(elapsed))
    
    
    def expand_cc__args(every_cc, all_cc, cc_args, limit):
        # 通过集合来过滤输入的重复参数
        codes = set()
        A_Z = string.ascii_uppercase  # 就是A_Z的大写字母字符串
        # 当有-e参数时,包含了所有的az两个数字的组合
        if every_cc:
            codes.update(a+b for a in A_Z for b in A_Z)
        # 当 -a读取文件
        elif all_cc:
            with open(COUNTRY_CODES_FILE) as fp:
                text = fp.read()
            codes.update(text.split())
        # 当有位置参数的时候,位置参数为输入
        else:
            for cc in (c.upper() for c in cc_args):
                if len(cc) == 1 and cc in A_Z:
                    codes.update(cc+c for c in A_Z)
                elif len(cc) == 2 and all(c in A_Z for c in cc):
                    codes.add(cc)
                else:
                    msg = 'rech CC argument must be A ot Z or AA to ZZ'
                    raise ValueError('*** Usage error: ' + msg)
        # 返回数据列表
        return sorted(codes)[:limit]
    
    
    def process_args(default_concur_req):
        server_options = ', '.join(sorted(SERVERS))
        parser = argparse.ArgumentParser(
            description='Download flags fir country codes. '
            'Default: top 20 countries by population')
        # cc为位置参数,nargs='*'表示可以接收0到任意数量的位置参数数据
        parser.add_argument('cc', metavar='CC', nargs='*',
                            help='country code or 1st letter (eg. B for BA...BZ)')
        parser.add_argument('-a', '--all', action='store_true',
                            help='get all available flags (AD to ZW)')
        parser.add_argument('-e', '--every', action='store_true',
                            help='get flags for every possible code (AA...ZZ)')
        parser.add_argument('-l','--limit', metavar='N', type=int,
                            help='limit to N first codes', default=sys.maxsize)
        parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                            default=default_concur_req,
                            help='maximum concurrent requests (default={})'
                            .format(default_concur_req))
        parser.add_argument('-s', '--server', metavar='LABEL',
                            default=DEFAULT_SERVER,
                            help='Server to hit; one of {} (default={})'
                            .format(frozenset, DEFAULT_SERVER))
        parser.add_argument('-v', '--verbose', action='store_true',
                            help='output detailed progress info')
        args = parser.parse_args()
        if args.max_req < 1:
            print('***Usage error: --max_req CONCURRENT must be >= 1')
            parser.print_usage()
            sys.exit(1)
        if args.limit < 1:
            print('*** Usage error: --limit N must be >= 1')
            parser.print_usage()
            sys.exit(1)
        args.server = args.server.upper()
        if args.server not in SERVERS:
            print('*** Usage error: --server LABEL must be one of',
                  server_options)
            parser.print_usage()
            sys.exit(1)
        try:
            # args.every,args.all,args.cc默认不输入前面两个为false,后面一个是空列表,limit限制默认是系统最大值
            cc_list = expand_cc__args(args.every, args.all, args.cc, args.limit)
        except ValueError as exc:
            # 捕获expand_cc__args上报的错误
            print(exc.args[0])
            parser.print_usage()
            sys.exit(1)
            # 如果返回的是空列表,读取脚本中的定义列表
        if not cc_list:
            cc_list = sorted(POP20_CC)
        # 符合条件返回args对象,激
        return args, cc_list
    
    
    def main(download_many, default_concur_req, max_concur_req):
        # 返回args对象,以及待处理的国家名单
        args, cc_list = process_args(default_concur_req)
        # args.max_req输入参数最大值,max_concur_req默认的最大值,处理国家列表的数值
        # 求出最小值,避免开多余的线程浪费。
        actual_req = min(args.max_req, max_concur_req, len(cc_list))
        # 准备下载前面的情况说明。
        initial_report(cc_list, actual_req, args.server)
        # 基础网址
        base_url = SERVERS[args.server]
        t0 = time.time()
        # 执行程序 counter接收关系对象字典
        counter = download_many(cc_list, base_url, args.verbose, actual_req)
        # 当接受到的状态与处理名单一致,就没错,说明每个国家都申请了。
        assert sum(counter.values()) == len(cc_list), 
            'some down loads are unaccounted for'
        # 出总结报告
        final_report(cc_list, counter, t0)
    

    单线程下载文件:

    import collections
    
    import requests
    import tqdm
    
    from flags2_common import main, save_flag, HTTPStatus, Result
    
    
    DEFAULT_CONCUR_REQ = 1
    MAX_CONCUR_REQ = 1
    
    
    def get_flag(base_url, cc):
        url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
        resp = requests.get(url)
        if resp.status_code != 200:
            resp.raise_for_status()
        return resp.content
    
    
    def download_one(cc, base_url, verbose = False):
        try:
            image = get_flag(base_url, cc)
        except requests.exceptions.HTTPError as exc:
            res = exc.response
            # 只处理404说没找到,另外的错误代码继续上浮上去
            if res.status_code == 404:
                # status为Enum关系对象
                status = HTTPStatus.not_found
                msg = 'not find'
            else:
                raise
        else:
            save_flag(image, cc.lower() + '.gif')
            # status为Enum关系对象
            status = HTTPStatus.ok
            msg = 'ok'
        # 显示清单
        if verbose:
            print(cc, msg)
        # 返回Result对象,里面包含了一个status关系对象
        return Result(status, cc)
    
    def download_many(cc_list, base_url, verbose, max_req):
        counter = collections.Counter()
        cc_iter = sorted(cc_list)
        # 没有输入-v,给一个下载条
        if not verbose:
            cc_iter = tqdm.tqdm(cc_iter)
        # 给清单,没有下载条
        for cc in cc_iter:
            try:
                res = download_one(cc, base_url, verbose)
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP error {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = 'Connection error'
            else:
                error_msg = ''
                # 正常跑下去,可以获取到ok与not_find(404)的关系对象
                status = res.status
    
            if error_msg:
                # 如果有报错,获取到错误的关系对象
                status = HTTPStatus.error
            counter[status] += 1
            # 没有下载条,或者
            if verbose and error_msg:
                print('*** Error for {}: {}'.format(cc, error_msg))
        # 返回Enum关系对象的字典
        return counter
    
    
    if __name__ == '__main__':
        main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
    

    多线程下载文件:

    import collections
    from concurrent import futures
    
    import requests
    import tqdm
    
    from flags2_common import main, HTTPStatus
    from flags2_sequential import download_one
    
    # 默认线程数量
    DEFAULT_CONCUR_REQ = 30
    # 最大线程数量
    MAX_CONCUR_REQ = 1000
    
    
    def download_many(cc_list, base_url, verbose, concur_req):
        counter = collections.Counter()
        with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
            to_do_map = {}
            for cc in sorted(cc_list):
                future = executor.submit(download_one, cc, base_url, verbose)
                to_do_map[future] = cc
            done_iter = futures.as_completed(to_do_map)
            if not verbose:
                done_iter = tqdm.tqdm(done_iter, total=len(cc_list))
            for future in done_iter:
                try:
                    res = future.result()
                except requests.exceptions.HTTPError as exc:
                    error_msg = 'HTTP error {res.status_code} - {res.reason}'
                    error_msg = error_msg.format(res=exc.response)
                except requests.exceptions.ConnectionError as exc:
                    error_msg = 'Connection error'
                else:
                    error_msg = ''
                    # 正常跑下去,可以获取到ok与not_find(404)的关系对象
                    status = res.status
                if error_msg:
                    status = HTTPStatus.error
                counter[status] += 1
                if verbose and error_msg:
                    cc = to_do_map[future]
                    print('*** Error for {}: {}'.format(cc, error_msg))
    
        return counter
    
    
    if __name__ == '__main__':
        main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
    

    上面是抄袭书中的代码,实现单线程或多线程下载文件的过程,整个代码如行云流水,果然是大神的作品。

    默认是通过本地的nigix代理下载,还有下载延迟与下载错误,我没有演示。

    很厉害的通过argparse模块来处理默认参数与输入参数。

    真的很厉害,参数对象的保存,下载反馈的情况保存用到了Enum。

    膜拜ing


     

  • 相关阅读:
    贝叶斯公式由浅入深大讲解—AI基础算法入门
    再谈前端HTML模板技术
    再谈angularJS数据绑定机制及背后原理—angularJS常见问题总结
    mac版chrome升级到Version 65.0.3325.18后无法打开百度bing搜狗
    图说js中的this——深入理解javascript中this指针
    web app响应式字体设置!rem之我见
    【2020-04-18】 加班
    【2020-04-06】汇郡海下的沉思
    【2020-03-28】Dubbo源码杂谈
    【2020-03-21】Dubbo本地环境搭建-实现服务注册和消费
  • 原文地址:https://www.cnblogs.com/sidianok/p/12197410.html
Copyright © 2011-2022 走看看