zoukankan      html  css  js  c++  java
  • concurrent.futures

    前言

    源码:
    	Lib/concurrent/futures/thread.py
    	Lib/concurrent/futures/process.py
    concurrent.futures:
    	异步执行可以由 ThreadPoolExecutor 或 ProcessPoolExecutor 来实现
    	两者都是实现抽像类 Executor 定义的接口
    

    Future 方法对象说明

    对象名 说明
    cancel() 尝试去取消调用。如果调用当前正在执行,不能被取消。这个方法将返回False,否则调用将会被取消,方法将返回True
    cancelled() 如果调用被成功取消返回True
    running() 如果当前正在被执行不能被取消返回True
    done() 如果调用被成功取消或者完成running返回True
    result(Timeout = None) 拿到调用返回的结果。如果没有执行完毕就会去等待
    exception(timeout=None) 捕获程序执行过程中的异常
    add_done_callback(fn) 将fn绑定到future对象上。当future对象被取消或完成运行时,fn函数将会被调用

    进程池、线程池使用案例

    • 进程池与线程池调用方式、传参差不多,只是调用模块不同......
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import os
    import time
    from concurrent.futures import ProcessPoolExecutor  # 进程池模块
    from concurrent.futures import ThreadPoolExecutor  # 线程池模块
    
    #  下面是以进程池为例, 线程池只是模块改一下即可
    def task(name):
        print("name: %s  pis%s  run" % (name, os.getpid()))
        time.sleep(1)
    
    if __name__ == "__main__":
        # thread = ThreadPoolExecutor(2)  # 设置进程池大小,默认等于cpu核数
        pool = ProcessPoolExecutor(2)  # 设置进程池大小,默认等于cpu核数
        for i in range(5):
            pool.submit(task, "进程%s" % i)  # 异步提交(只是提交需要运行的线程不等待)
    
        # 作用1:关闭进程池入口不能再提交了   作用2:相当于jion 等待进程池全部运行完毕
        pool.shutdown(wait=True)  
        print("主进程")
    
    

    异步调用与同步调用

    concurrent.futures模块提供了高度封装的异步调用接口
    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用

    同步调用

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import os, time, random
    from concurrent.futures import ProcessPoolExecutor  # 进程池模块
    
    # 1、同步调用: 提交完任务后、就原地等待任务执行完毕,拿到结果,再执行下一行代码(导致程序串行执行)
    def task(name):
        print("name: %s  pis%s  run" % (name,os.getpid()))
        time.sleep(random.randint(1, 3))
    
    if __name__ == "__main__":
        pool = ProcessPoolExecutor(4)
        for i in range(10):
            pool.submit(task, "进程%s" % i).result() # 同步迪奥用,result(),相当于join 串行
    
        pool.shutdown(wait=True)
        print("主进程")
    
    

    异步调用

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    from concurrent.futures import ProcessPoolExecutor  # 进程池模块
    import os, time, random
    
    def talk(name):
        print("name: %s  pis%s  run" % (name,os.getpid()))
        time.sleep(random.randint(1, 3))
    
    if __name__ == "__main__":
        pool = ProcessPoolExecutor(4)
        for i in range(10):
            pool.submit(talk, "进程%s" % i)  # 异步调用,不需要等待
    
        pool.shutdown(wait=True)
        print("主进程")
    

    回调机制

    可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

    #parse_page 拿到的是一个 future 对象 obj ,需要用 obj.result() 拿到结果
    用法:
    	p.submit(这里异步调用).add_done_callback(方法)
    

    回调例子

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import os
    import time
    import requests
    from concurrent.futures import ThreadPoolExecutor  # 线程池模块
    
    def get(url):
        print(f"GET {url}")
        response = requests.get(url)  # 下载页面
        time.sleep(3)  # 模拟网络延时
        return {"url": url, "content": response.text}  # 页面地址和页面内容
    
    def parse(res):
        # !取到res结果 【回调函数】带参数需要这样
        res = res.result()
        print(f"{res['url']} res is {len(res['content'])} Kb")
    
    if __name__ == "__main__":
        urls = {
            "http://www.jd.com",
            "http://www.baidu.com",
            "http://www.cnblogs.com"
        }
        thread = ThreadPoolExecutor(2)
        for i in urls:
            # 【回调函数】执行完线程后,跟一个函数
            thread.submit(get, i).add_done_callback(parse)
    
    
    废弃代码 - 折叠

    线程池

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import requests
    from concurrent.futures import ThreadPoolExecutor
    
    lis = [
        "https://www.baidu.com",
        "https://www.jd.com",
        "https://www.cnblogs.com",
    ]
    
    def response(url):
        _res = requests.get(url)
        print(_res.url)
        # return _res
    
    print("--- 主线程 ---")
    executor = ThreadPoolExecutor(max_workers=3)
    for url in lis:
        future = executor.submit(response, url)
        restful = future.running()
        print(restful)
    print("--- 主线程 | 结束 ---")
    
    """
    打印:
    	--- 主线程 ---
    		True
    		True
    		True
    	--- 主线程 | 结束 ---
    		https://www.jd.com/
    		https://www.cnblogs.com/
    		https://www.baidu.com/
    	[Finished in 1.9s]
    """
    

    进程池

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    Maxlis =[
        30000000,
        31000000,
        29000000
    ]
    
    def count(Maxnum):
        a = 0
        start = time.time()
        while True:
            a += 1
            if a == Maxnum:
                break
        print(" -> 结果: %s | 耗时 - %.3f" %(a, round(time.time()-start, 3)))
    
    start = time.time()
    print("--- 主线程 ---")
    executor = ProcessPoolExecutor(max_workers=3)
    for num in Maxlis:
        future = executor.submit(count, num)
        restful = future.running()
        print(restful)
    print(f"--- 主线程 | - 耗时 {round(time.time() - start, 2)} 结束 ---")
    
    """
    打印:
    	--- 主线程 ---
    		True
    		False
    		False
    	--- 主线程 | - 耗时 1.92 结束 ---
    		 -> 结果: 31000000 | 耗时 - 2.201
    		 -> 结果: 30000000 | 耗时 - 2.221
    		 -> 结果: 29000000 | 耗时 - 2.012
    	[Finished in 5.1s]
    """
    

    submit 实现

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import time
    from threading import Lock
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    count = 0
    maxsize = 10000000
    lock = Lock()
    
    def run():
        global count
        lock.acquire()
        for _ in range(maxsize):
            count += 1
        lock.release()
        print(count)
    
    def result():
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=3) as executor:
            to_do = []
            for site in range(3):
                future = executor.submit(run)
                to_do.append(future)
    
            for future in as_completed(to_do):
                future.result()
        print(f"总耗时 - {round(time.time() - start_time, 2)} 秒")
    result()
    
    """
    打印:
        10000000
        20000000
        30000000
        总耗时 - 2.22 秒
        [Done] exited with code=0 in 2.016 seconds
    """
    

    submit 实现-2

    # run() 方法跟上边的一样
    # submit 调用方法不一样
    
    def test_submit():
        start_time = time.time()
        test = ThreadPoolExecutor(max_workers=3)
        for _ in range(3):
            future = test.submit(run)
        test.shutdown(wait=True)
        print(f"总耗时 - {round(time.time() - start_time, 2)} 秒")
    
    test_submit()
    
    """
    打印:
        10000000
        20000000
        30000000
        总耗时 - 2.2 秒
        [Done] exited with code=0 in 2.016 seconds
    """
    

    回调函数

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @File    : $file_name.py
    # @Author  : BenLam
    # @Link    : https://www.cnblogs.com/BenLam/
    
    import os
    import time
    import requests
    import threading
    from concurrent.futures import ThreadPoolExecutor
    from concurrent.futures import ProcessPoolExecutor
    
    class Demo_1(object):
        """
        $ Time is Money
        $ 模仿批量请求网站,增加回调处理
        $ 为了更好区分 futures, 特意加了 time.sleep 强制睡眠
        $ response() 对象单线程执行就需要 30 秒
        """
        def __init__(self):
            self.lis = [
                "https://www.baidu.com",
                "https://www.jd.com",
                "https://www.cnblogs.com",
            ]
        def response(self, url):
            time.sleep(10)
            _res = requests.get(url)
            return _res
    
        def _restful(self, future, *args, **kwargs):
            # 回调函数输出打印
            _response = future.result()
            print(f"当前线程名: - {threading.currentThread().name}|状态码: {_response.status_code}")
    
        def main(self):
            print("--- 主线程 ---")
            start_time = time.time()
            executor = ThreadPoolExecutor(max_workers=3)
            for _ in self.lis:
                # 增加回调函数 call_back(对象)
                executor.submit(self.response, _).add_done_callback(self._restful)
            executor.shutdown(wait=True)
            print(f"--- 结束 | 总耗时: {round(time.time() - start_time, 3)} ---")
    
    
    class Demo_2(object):
        """
        $ 时间就是金钱,大家都懂
        $ 多进程减法处理,增加回调处理
        """
        def __init__(self):
            self.lis = [10000, 20000, 30000]
    
        def desc(self, num):
            time.sleep(10)
            while num > 0:
                num -= 1
            return num
    
        def _restful(self, future, *args, **kwargs):
            # 回调函数输出打印
            _result = future.result()
            print(f"当前进程: - {os.getpid()}|状态码: {_result}")
    
        def main(self):
            print("--- 主进程 ---")
            start_time = time.time()
            executor = ProcessPoolExecutor(max_workers=3)
            for _ in self.lis:
                executor.submit(self.desc, _).add_done_callback(self._restful)
            executor.shutdown(wait=True)
            print(f"--- 结束 | 总耗时: {round(time.time() - start_time, 3)} ---")
    
    if __name__ == '__main__':
        Demo_1().main()
        """
        打印:
            --- 主线程 ---
                当前线程名: - ThreadPoolExecutor-0_1|状态码: 200
                当前线程名: - ThreadPoolExecutor-0_2|状态码: 200
                当前线程名: - ThreadPoolExecutor-0_0|状态码: 200
            --- 结束 | 总耗时: 11.962 ---
            [Finished in 12.7s]
        """
    
        Demo_2().main()
        """
        打印:
            --- 主进程 ---
            当前进程: - 9792|计算结果: 0
            当前进程: - 9792|计算结果: 0
            当前进程: - 9792|计算结果: 0
            --- 结束 | 总耗时: 12.441 ---
            [Finished in 13.2s]
        """
        # [Finished in 24.1s]
    

    Executor 死锁

    • Executor 的子类使用线程池来异步执行调用,
    • 如果使用不正确可能会造成死锁,
    • submit 的 task 尽量不要调用 executor 和 futures, 不然很容易出现死锁

    两个方法相互等待结束的死锁

    from concurrent.futures import ThreadPoolExecutor
    def wait_A():
        time.sleep(8)
        print(b.result())
        return "bbbbbbbbbb"
    
    def wait_B():
        time.sleep(9)
        print(a.result())
        return "aaaaaaaaaa"
    
    # submit 的 task 尽量不要调用 executor 和 futures, 不然很容易出现死锁
    executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_A)
    b = executor.submit(wait_B)
    
    """
    通过debug + 打断电,直观看到两个线程都在互相等待对方执行完毕
    debug -
    	ThreadPoolExecutir-0_0 正在运行
    	ThreadPoolExecutir-0_1 正在运行
    """
    

    调用内置函数的死锁

    def wait_future():
        # This will never complete because there is only one worker thread and
        # it is executing this function.
        f = executor.submit(abs, -5)
        print(f.result())
    
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_future)
    """
    手动强制停止:
        [Done] exited with code=1 in 25.838 seconds
    """
    

    超时直接抛异常

    • call_with_timeout 是定制一个超时方法,在规定时间内未完成会抛出异常
    def call_with_timeout(func, *args, timeout=3):
        """
        Executor API无法取消已在执行的调用。
        future.cancel()只能取消尚未开始的通话。
        如果你想要突然中止功能,
        你应该使用除了concurrent.futures.ProcessPoolExecutor以外的东西。
    
        future.result(timeout=timeout)
        """
        executor = ThreadPoolExecutor(max_workers=1)
        try:
            future = executor.submit(func)
            result = future.result(timeout=timeout)
        finally:
            executor.shutdown(wait=False)
    
    call_with_timeout(wait_A)
    """
    编译器打印:
    	Traceback (most recent call last):
    	  File "concurrent.futures - submit.py", line 91, in <module>
    		call_with_timeout(wait_A)
    	  File "concurrent.futures - submit.py", line 80, in call_with_timeout
    		result = future.result(timeout=timeout)
    	  File "C:python36libconcurrentfutures\_base.py", line 434, in result
    		raise TimeoutError()
    	concurrent.futures._base.TimeoutError
    
    	[Done] exited with code=1 in 8.67 seconds
    """
    

    map 方法

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(方法名字, 参数1, 参数2, 参数3)
    
    • 例子
    import os
    from concurrent.futures import ThreadPoolExecutor
    
    def test(n):
        print(f"当前运行线程 - {os.getpid()} | - {n} ")
    
    with ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(test, range(10))
        executor.shutdown(wait=True)
    """
    打印:
    	当前运行线程 - 5848 | - 0 
    	当前运行线程 - 5848 | - 1 
    	当前运行线程 - 5848 | - 2 
    	当前运行线程 - 5848 | - 3 
    	当前运行线程 - 5848 | - 4 
    	当前运行线程 - 5848 | - 5 
    	当前运行线程 - 5848 | - 6 
    	当前运行线程 - 5848 | - 7 
    	当前运行线程 - 5848 | - 8 
    	当前运行线程 - 5848 | - 9 
    
    	[Done] exited with code=0 in 0.661 seconds
    """
    
  • 相关阅读:
    使用Docfx生成项目文档
    代码性能优化-----减少数据库读取次数
    代码性能优化-----前端页面异步实现
    代码性能优化——task
    SVN批处理
    性能优化
    provider 设计模式
    【IObit】五大软件激活码( Advanced Systemcare....)
    关于 facebook
    关于 Google 与 Chrome
  • 原文地址:https://www.cnblogs.com/BenLam/p/12721420.html
Copyright © 2011-2022 走看看