zoukankan      html  css  js  c++  java
  • python多任务处理 --- concurent

    concurrent.futures模块

    这是一个python实现进程池和线程池的模块,由于频繁的创建线程或者进程是一件较为繁琐的操作,且浪费时间和资源。于是Python提供了线程池和进程池的模块。我们只需要将要执行的任务提交给池对象,池会自动创建线程或是进程对象异步执行这些任务,最后拿到返回结果。这些池中的进程和线程创建后不会立即自动销毁,而是等待下一次任务的提交,方便我们重复使用。

    ThreadPoolExecutor对象

    只需要定义一个池执行器对象,指定该池对象的最大任务数。第一次向池中提交任务时,池会对应创建与任务数对应的线程数,但不超过提供的最大任务数,超过最大任务数的任务会排队等待前一个任务执行完成。

    每次提交一个任务可以得到这个任务的future对象。即使用submit()方法将会得到这个future对象,可以通过该对象的方法查看该任务是否完成,完成的结果,以及是否报错等信息,同时还支持绑定回调函数。

    future对象方法 含义
    done() 该future对象对应的任务完成返回True,否则返回False
    cancelled() 任务是否成功被取消,返回bool,且任务取消会标记对应future对象状态为CANCELED
    running() 是否正在运行
    cancel() 尝试取消该任务,已经执行无法取消,返回false,否则返回True
    result(timeout=None) 取任务执行结果,默认一直等待,或者设置timeout等待时间,超时抛出异常
    exception(timeout=None) 取任务执行时的错误信息,timeout等待时间,超时抛出异常
    f.add_done_callback(fn) 设置任务结束时的回调函数,该函数使用该future对象作为参数

    示例:提交大量的任务,获取这些任务的结果

    from concurrent import futures
    import threading
    
    def func1():
        logging.info("this is func")
        time.sleep(3)
        return 1233
    
    def main():
        executer = futures.ThreadPoolExecutor(3)
        fs = []
    for i in range(10):
            f = executer.submit(func1)
            fs.append(f)
    
    while True:
            for f in fs:          # 遍历列表判断是否全部完成
                if not f.done():  # 只要有一个任务没有完成,结束for循环,再一次从头判断
                    time.sleep(1)
                    break
    
            else:  # for 正常退出,全部执行完成
                ret = []
                for f in fs:
                    ret.append((f.result()))  # 将结果保存
                print("任务执行完成后,shutdown前:", threading.active_count())
                executer.shutdown()  # 程序结束前清空执行器
                break
    
        print("计算结果:", ret)     # 打印结果,即任务函数的返回值
    if __name__ == "__main__":
        main()

    执行器对象支持上下文管理,所以可以使用with语句

    with futures.ThreadPoolExecutor(3) as executor:
        fs = []
        for i in range(10):
            f = executer.submit(func1)
            fs.append(f)
        # ....后代码相同,最后不需要执行executor.shutdown(),上下文自动执行

    错误处理

    执行池在执行提交的任务时候,如果该任务发生错误,将会停止该任务的执行,设置该任务的future对象状态为FINISHED,并将future对象_exception属性设置为True。在我们获取future对象的结果前,需要判断这个任务是否是由于错误而终止。如果发生过异常,那么该异常在future._exception属性上, 但是不宜直接获取,推荐使用future.exception()接口获得这个错误。上面的代码部分修改后。

    def func1():
        logging.info("this is func")
        time.sleep(3)
        return 1233
    
    def main():
        executer = futures.ThreadPoolExecutor(3)
        fs = []
       for i in range(10):
            f = executer.submit(func1)
            fs.append(f)

      while
    True:    for f in fs:    if not f.done():    time.sleep(1)    break    else:    ret = []    for f in fs:    if f._exception: # 该future对象的发生了异常    print(f.exception()) # 这里选择打印信息即可,使用excption()方法可以获取这个异常信息    continue    ret.append((f.result())) # 所有正确的结果才能进入该列表    executer.shutdown()    break

    设置回调函数

    任务执行完成时可以自动进行函数的回调,在提交任务时没有设置提交这个回调函数的接口,只能使用提交任务后获得future对象动态增加。future.add_done_callback(fn),fn函数必须为一参函数且未来的实参为该future对象。

    def callbackfunc(future):
        global count
        with lock:
            count += 1
        if future._exception:
            print(future.exception())
        else:
            ret.append(future.result())
    
    def funcx(i):
        if i > 9:
            raise IndexError("i > 9 error")
        return 1233
    
    
    if __name__ == "__main__":
        lock = threading.Lock()
        with futures.ThreadPoolExecutor(3) as executor:
            ret = []
            count = 0
            for i in range(10):
                f = executor.submit(funcx, i+1)
                f.add_done_callback(callbackfunc)  # 添加回调函数
        while count < 10:
            time.sleep(0.2)
        print(ret)

    这里尝试使用回调函数将任务计算的结果添加到ret类表中,各个线程分别添加。但事实并非完全如此,在添加回调函数时f.add_done_callback(callbackfunc),如果该任务已经被完成了,这个回调函数将会交被主线程调用,也就是说,如果该任务执行时间比添加任务的时间还短,那么该回调函数实际是被主线程执行,for循环中实际为一种串行执行。所以在提交任务之后,应该在尽量短的时间内给该任务的future对象提交callback。

    源码:

    def add_done_callback(self, fn):
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)   # 添加,将来任务执行完成后自动调用
                return
        fn(self)  # 否则,任务已完成,线程自己调用

    ProcessPoolExecutor

    一个进程池执行器类,使用方式和线程池执行器基本一致。只是创建进程去处理这些任务。

    from concurrent import futures
    def func():
        sum = 0
        for i in range(100000):
            sum += 1
    
    # 同样支持上下文管理,结束时自动执行shutdown()
    with futures.ProcessPoolExecutor(3) as executor:
        future = exector.submit(func)
    
        while not future.done():
            pass
        if future._exception:
            print(future.exceptiion())
        else:
            print(future.result())

    只提交了一个任务给进程池执行,将只会创建一个进程执行任务。一般为计算密集型的任务才使用进程池执行,IO密集型优先使用多线程。

  • 相关阅读:
    <转载> diff 和 patch 命令
    JZ2440开发板之LCD
    发音篇--第一章
    JZ2440开发板之系统始终和串口
    【还是用回csdn了,这个blog就不更新了】
    MyBatis 汉字作为查询条件查询不到 MySQL 中的结果
    LeetCode 617. 合并二叉树 Merge Two Binary Tree
    LeetCode 454. 四数相加 II 4Sum II
    LeetCode 441. 排列硬币 Arranging Coins
    leetcode ——从排序数组中删除重复项 II
  • 原文地址:https://www.cnblogs.com/k5210202/p/13067936.html
Copyright © 2011-2022 走看看