zoukankan      html  css  js  c++  java
  • concurrent.futures模块

    class concurrent.futures.Executor

    Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。

    我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

    Future可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

    1.多线程ThreadPoolExecutor

    from concurrent.futures  import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED
    import requests
    import time
    urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
    def load_url(url, timeout):
        print(f"{url} start 时间 %s"%time.asctime())
        s=requests.request("get",url, timeout=timeout)
        print(f"{url} end时间 %s"%time.asctime())
        return str(s.json())
    wokers=ThreadPoolExecutor(max_workers=5)
    tasks=[wokers.submit(load_url,i,timeout=10) for i in  urls]
    
    for task in as_completed(tasks):
        print(task.result())
    

     excutor对象互相引用引发死锁

    ThreadPoolExecutorExecutor使用线程池异步执行调用子类。

    当与之关联的可调用对象Future等待另一个对象的结果时,就会发生死锁Future例如:

    import time
    def wait_on_b():
        time.sleep(5)
        print(b.result())  # b will never complete because it is waiting on a.
        return 5
    
    def wait_on_a():
        time.sleep(5)
        print(a.result())  # a will never complete because it is waiting on b.
        return 6
    
    
    executor = ThreadPoolExecutor(max_workers=2)
    a = executor.submit(wait_on_b)
    b = executor.submit(wait_on_a)
    

    和:excutor 调用woker不足引发无法返回

    def wait_on_future():
        f = executor.submit(pow, 5, 2)
        # This will never complete because there is only one worker thread and
        # it is executing this function.
        print(f.result())
    
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(wait_on_future)

     2.as_completed

    as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。

    参数是任务(submit的返回值)列表

    例子见上。

    3.

    wait

    wait方法可以让主线程阻塞,直到满足设定的要求。

    wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都结束。等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待。

    from concurrent.futures  import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED
    import requests
    import time
    urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
    def load_url(url, timeout):
        print(f"{url} start 时间 %s"%time.asctime())
        s=requests.request("get",url, timeout=timeout)
        print(f"{url} end时间 %s"%time.asctime())
        return str(s.json())
    wokers=ThreadPoolExecutor(max_workers=5)
    tasks=[wokers.submit(load_url,i,timeout=10) for i in  urls]
    
    # for task in as_completed(tasks):
    #     print(task.result())
    
    res=wait(tasks,timeout=10,return_when=FIRST_COMPLETED)
    print(res.done)
    for i in tasks:
        print(i.result(),i.done(),i.cancelled())
    

      

    结果分析:

    return_when=FIRST_COMPLETED

    http://127.0.0.1:8000/index start 时间 Sat Dec 14 12:17:48 2019
    http://127.0.0.1:8000/stuTable/ start 时间 Sat Dec 14 12:17:48 2019
    http://127.0.0.1:8000/index end时间 Sat Dec 14 12:17:48 2019
    {<Future at 0x33a67b0 state=finished returned str>}
    {'user': 'test001', 'msg': 'this is test index view '} True False
    http://127.0.0.1:8000/stuTable/ end时间 Sat Dec 14 12:17:51 2019
    {'A': 888, 'NN': 899} True False

    第一个完成后就直接返回了完成的对象,即使后面通过打印获取到后完成的task的结果,

    concurrent.futures.waitfstimeout = Nonereturn_when = ALL_COMPLETED 

    等待fs给定Future实例(可能由其他Executor实例创建 完成。返回一组命名的2元组。第一组名为,包含在等待完成之前完成的期货(完成或取消的期货)。第二组名为,包含未完成的期货(待定或正在运行的期货)。donenot_done

    超时可用于控制返回之前等待的最大秒数。 超时可以是int或float。如果未指定timeoutNone,则等待时间没有限制。

    return_when指示该函数何时应返回。它必须是以下常量之一:

    不变

    描述

    FIRST_COMPLETED

    以后完成或取消操作时,该函数将返回。

    FIRST_EXCEPTION

    当将来通过引发异常结束时,该函数将返回。如果没有未来引发例外,则等同于 ALL_COMPLETED

    ALL_COMPLETED

    当所有期货结束或被取消时,该函数将返回。

    concurrent.futures.as_completedfstimeout = None 

    返回fs给定Future实例(可能由不同的Executor实例创建的迭代器,该迭代器将在完成时生成期货(完成或取消的期货)。fs给定的任何重复的期货将被退回一次。之前完成的任何期货 都将首先产生。返回的迭代器引发if 调用,并且从原始调用到超时后,结果不可用。 超时可以是int或float。如果 未指定timeout,则等待时间没有限制。as_completed()concurrent.futures.TimeoutError__next__()as_completed()None

    4.ProcessPoolExecutor对象

    ProcessPoolExecutor类是Executor使用的过程池异步执行调用子类。 ProcessPoolExecutor使用该multiprocessing模块,它可以避开全局解释器锁定,但也意味着只能执行和返回可拾取对象。

    __main__模块必须可由工作程序子进程导入。这意味着ProcessPoolExecutor在交互式解释器中将不起作用。

    从可调用对象提交到的调用ExecutorFuture方法ProcessPoolExecutor将导致死锁。

    concurrent.futures.ProcessPoolExecutormax_workers = Nonemp_context = Noneinitializer = Noneinitargs =()

    Executor使用最多max_workers进程池异步执行调用子类如果max_workersNone或者没有给出,将默认为机器上的处理器数量。如果max_workers小于或等于0,则将ValueError 引发a。在Windows上,max_workers必须等于或小于61如果不是,ValueError则将被引发。如果max_workersNone,那么61即使有更多处理器可用,默认选择也将是最多。 mp_context可以是多处理上下文,也可以是“无”。它将用来发动工人。如果mp_contextNone 如果未指定,则使用默认的多处理上下文。

    初始化程序是一个可选的可调用对象,它在每个工作进程开始时被调用;initargs是传递给初始化程序的参数的元组。如果初始化器引发异常,则所有当前暂挂的作业都会引发BrokenProcessPool,以及任何尝试向池中提交更多作业的尝试。

    在版本3.3中进行了更改:当其中一个工作进程突然终止时, BrokenProcessPool现在会引发错误。以前,行为是不确定的,但是对执行器或其期货的操作通常会冻结或死锁。

    from multiprocessing import Process,Lock
    
    def  func_mutiprocess(i):
         def ss():
             l = []
             for i in range(1000000000000000000):
                 l.append(i)
         loc=Lock()
         loc.acquire()
         ss()
         print(f"this process {i},{time.asctime()}")
         loc.release()
    
    def process_input():
        pool=[]
        pools=[Process(target=func_mutiprocess,args=(i,) ) for i in range(multiprocessing.cpu_count())]
        for p  in pools:
            p.start()
            pool.append(p)
        for j in pool:
            j.join()
    
    if __name__ == '__main__':
    
       while True:
            try:
             process_input()
            except Exception as e:
                pass
    

      这个一个多进程引发内存占用100%爆掉的反面案例,原因是利用多进程创建超大列表容器

    关于Future:

    所述Future类封装一个可调用的异步执行。 Future实例由创建Executor.submit()

    concurrent.futures.Future

    封装可调用对象的异步执行。 Future 实例是由Executor.submit()测试人员创建的,除了测试外,不应直接创建。

    cancel

    尝试取消呼叫。如果该调用当前正在执行或正在运行,并且无法取消,则该方法将返回 False,否则,该调用将被取消并且该方法将返回True

    cancelled

    True如果呼叫已成功取消,则返回

    running

    True如果当前正在执行该调用且无法取消该调用,则返回

    done

    返回True如果调用成功取消或结束运行。

    resulttimeout = None 

    返回调用返回的值。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将 concurrent.futures.TimeoutError引发a。超时可以是int或float。如果未指定timeoutNone,则等待时间没有限制。

    如果在完成之前取消了未来,CancelledError 则将被提出。

    如果调用引发,则此方法将引发相同的异常。

    exceptiontimeout = None 

    返回调用引发的异常。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将 concurrent.futures.TimeoutError引发a。 超时可以是int或float。如果未指定timeoutNone,则等待时间没有限制。

    如果在完成之前取消了未来,CancelledError 则将被提出。

    如果呼叫完成而没有加注,None则返回。

    add_done_callbackfn 

    将可调用的fn附加到将来。 当取消未来或完成运行时,将调用fn,并将future作为唯一参数。

    添加的可调用对象按添加顺序被调用,并且始终在属于添加它们的进程的线程中调用。如果可调用对象引发Exception子类,则将其记录并忽略。如果callable引发BaseException子类,则该行为未定义。

    如果将来已经完成或被取消,则将立即调用fn

    以下Future方法适用于单元测试和 Executor实现。

    set_running_or_notify_cancel

    Executor在执行与Future和单元测试相关的工作之前实现应调用此方法

    如果方法返回FalseFuture则取消,Future.cancel()即被调用并返回True等待Future完成的所有线程(即通过 as_completed()wait())都将被唤醒。

    如果该方法返回,True则该Future不会被取消并已处于运行状态,即对的调用 Future.running()将返回True

    该方法只能被调用一次,不能在调用之后 Future.set_result()Future.set_exception()已经被调用。

    set_result结果

    设置与Futureto 结果关联的工作结果

    此方法仅应由Executor实现和单元测试使用。

    在版本3.8中更改:concurrent.futures.InvalidStateError如果Future已经完成引发此方法 

    set_exception例外

    设置与相关的工作结果Future的 异常Exception

    此方法仅应由Executor实现和单元测试使用。

    在版本3.8中更改:concurrent.futures.InvalidStateError如果Future已经完成引发此方

  • 相关阅读:
    深入理解ASP.NET Core依赖注入
    Docker Swarm 从入门到放弃
    Asp.net Core全局异常监控和记录日志
    NServiceBus+RabbitMQ开发分布式应用
    NServiceBus+Saga开发分布式应用
    使用NServiceBus开发分布式应用
    springboot自动配置原理
    SpringMVC 源码解析
    instruments symbol name 不显示函数名!
    ld: framework not found FileProvider for architecture arm64
  • 原文地址:https://www.cnblogs.com/SunshineKimi/p/12038700.html
Copyright © 2011-2022 走看看