zoukankan      html  css  js  c++  java
  • [Python 多线程] Concurrent (十五)

    concurrent包只有一个模块:

    concurrent.futures - 启动并行任务

    异步并行任务编程模块,提供一个高级的异步可执行的便利接口。

    futures模块提供了2个池执行器

    ThreadPoolExecutor 异步调用的线程池的Executor

    ProcessPoolExecutor 异步调用的进程池的Executor

    ThreadPoolExecutor对象

    首先需要定义一个池的执行器对象,Executor类子类对象。

    ThreadPoolExecutor(max_worker=1)     池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例

    submit(fn,*args,**kwargs)    提交执行的函数即参数,返回Future实例

    shutdown(wait=True)    清理池

    Future类方法:

    result()    可以查看调用的返回的结果

    done()     如果调用被成功的取消或者执行完成,返回True

    canceled()    如果调用被成功的取消,返回True

    running()     如果正在运行且不能被取消,返回True

    cancel()     尝试取消调用。如果已经执行且不能取消返回False,否则返回True

    result(timeout=None)   取返回的结果,超时为None,一直等待返回;超时设置到期,抛出concurrent.futures.TimeoutError异常

    execption(timeout=None)   取返回的异常,超时为None,一直等待返回;超时设置到期,抛出conncurrent.futures.TimeoutError异常

    1) 线程池并行异步执行:

    # 并行异步执行,线程 ThreadPoolExecutor
    
    import threading
    import logging
    from concurrent import futures
    import time
    logging.basicConfig(level=logging.INFO,format="%(thread)s %(message)s")
    
    def work(n): #工作函数
        logging.info('wokring-{}'.format(n))
        time.sleep(5)
        logging.info('end work-{}'.format(n))
    
    executor = futures.ThreadPoolExecutor(3) #线程
    
    fs = [] #线程池容器
    
    for i in range(3):
        f = executor.submit(work,i) #提交执行的函数及参数
        fs.append(f)
    
    for i in range(3,6):
        f = executor.submit(work,i)
        fs.append(f)
    
    while True:
        time.sleep(2)
        logging.info(threading.enumerate())
    
        flag = True
    
        for f in fs:
            flag = flag and f.done() #调用是否被成功的取消或运行完成
    
        if flag:
            executor.shutdown() #清理池
            logging.info(threading.enumerate())
            break
    
    #运行结果:
    123145331777536 wokring-0
    123145337032704 wokring-1
    123145342287872 wokring-2
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145331777536)>, <Thread(Thread-2, started daemon 123145337032704)>, <Thread(Thread-3, started daemon 123145342287872)>]
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145331777536)>, <Thread(Thread-2, started daemon 123145337032704)>, <Thread(Thread-3, started daemon 123145342287872)>]
    123145331777536 end work-0
    123145331777536 wokring-3
    123145337032704 end work-1
    123145337032704 wokring-4
    123145342287872 end work-2
    123145342287872 wokring-5
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145331777536)>, <Thread(Thread-2, started daemon 123145337032704)>, <Thread(Thread-3, started daemon 123145342287872)>]
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145331777536)>, <Thread(Thread-2, started daemon 123145337032704)>, <Thread(Thread-3, started daemon 123145342287872)>]
    123145331777536 end work-3
    123145342287872 end work-5
    123145337032704 end work-4
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145331777536)>, <Thread(Thread-2, started daemon 123145337032704)>, <Thread(Thread-3, started daemon 123145342287872)>]
    4320629568 [<_MainThread(MainThread, started 4320629568)>]
    

      

    2) 进程池并行异步执行:

    import threading #ProcessPoolExecutor进程池
    import logging
    from concurrent import  futures
    import time
    logging.basicConfig(level=logging.INFO,format="%(thread)s %(message)s")
    
    def work(n):
        logging.info('wokring-{}'.format(n))
        time.sleep(5)
        logging.info('end work-{}'.format(n))
    
    if __name__ == "__main__":
        executor = futures.ProcessPoolExecutor(3) #进程
    
        fs = []
    
        for i in range(3):
            f = executor.submit(work,i)
            fs.append(f)
    
        for i in range(3,6):
            f = executor.submit(work,i)
            fs.append(f)
    
        while True:
            time.sleep(2)
            logging.info(threading.enumerate())
    
            flag = True
    
            for f in fs:
    
                flag = flag and f.done()
    
            if flag:
                executor.shutdown()
                logging.info(threading.enumerate())
                break
    
    #运行结果:
    4320629568 wokring-1
    4320629568 wokring-2
    4320629568 wokring-0
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145319972864)>, <Thread(QueueFeederThread, started daemon 123145325228032)>]
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145319972864)>, <Thread(QueueFeederThread, started daemon 123145325228032)>]
    4320629568 end work-0
    4320629568 end work-1
    4320629568 end work-2
    4320629568 wokring-3
    4320629568 wokring-4
    4320629568 wokring-5
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145319972864)>, <Thread(QueueFeederThread, started daemon 123145325228032)>]
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145319972864)>, <Thread(QueueFeederThread, started daemon 123145325228032)>]
    4320629568 end work-3
    4320629568 end work-4
    4320629568 end work-5
    4320629568 [<_MainThread(MainThread, started 4320629568)>, <Thread(Thread-1, started daemon 123145319972864)>, <Thread(QueueFeederThread, started daemon 123145325228032)>]
    4320629568 [<_MainThread(MainThread, started 4320629568)>]
    

      

     支持上下文管理

    concurrent.futures.ProcessPoolExecutor继承自conncurrent.futures.base.Executor,而父类有__enter__、__exit__方法,支持上下文管理。可以使用with语句

    with ThreadPoolExecutor(max_workers=5) as executor:
    	future = executor.submit(work,n)
    	print(future.result())
    

      

    总结:

    统一了线程池、进程池调用,简化了编程。

    是Python简单的思想哲学的体现。

    唯一的缺点:无法设置线程名称。

  • 相关阅读:
    网页结构树DOM
    网页设计之js
    css了解一下!!!
    Html !!!了解一下
    进程and线程and协程效率对比
    线程
    进程之生产者消费者模型(队列,管道,数据共享,进程池)
    进程之机制问题(锁,信号,事件)
    并发进程
    socket模块其他用法
  • 原文地址:https://www.cnblogs.com/i-honey/p/8082197.html
Copyright © 2011-2022 走看看