zoukankan      html  css  js  c++  java
  • Python之路(第四十六篇)多种方法实现python线程池(threadpool模块multiprocessing.dummy模块concurrent.futures模块)

    一、线程池

    很久(python2.6)之前python没有官方的线程池模块,只有第三方的threadpool模块,

    之后再python2.6加入了multiprocessing.dummy 作为可以使用线程池的方式,

    在python3.2(2012年)之后加入了concurrent.futures模块(python3.1.5也有,但是python3.1.5发布时间晚于python3.2一年多),这个模块是python3中自带的模块,但是python2.7以上版本也可以安装使用。

    下面分别介绍下各个线程池模块的使用

    使用环境python3.6.4,win7

    threadpool模块(上古时期--python2.6之前)

    由于threadpool模块是第三方模块需要进行安装,

    安装

    pip install threadpool 或者在pycharm--settings--Project interpreter中安装

     

    使用介绍

    (1)引入threadpool模块

    (2)定义线程函数

    (3)创建线程 池threadpool.ThreadPool()

    (4)创建需要线程池处理的任务即threadpool.makeRequests()

    (5)将创建的多个任务put到线程池中,threadpool.putRequest

    (6)等到所有任务处理完毕theadpool.pool()

    代码实例

    import threadpool, time
    ​
    name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
                 'Amuro Namie',' Sarah Brightman']
    ​
    ​
    def Say_hello(str):
        print("Hello ", str)
        time.sleep(2)
    ​
    ​
    def main():
        start_time = time.time()
        #第一步 创建线程池,线程数为4:
        pool = threadpool.ThreadPool(4)
        # 第二步创建线程请求,包涵调用的函数、参数和回调函数:
        # requests = threadpool.makeRequests(func, args_list, call_back)
        
        requests = threadpool.makeRequests(Say_hello, name_list)
        # 第三步将所有要运行多线程的请求扔进线程池:
        [pool.putRequest(req) for req in requests]
        # 第四步等待所有的线程完成工作后退出:
        pool.wait()
        print('用时共: %s second' % (time.time() - start_time))
    ​
    ​
    ​
    if __name__ == '__main__':
        main()
    ​
    

      

    输出

    Hello  Satomi Ishihara
    Hello  Aragaki Yui
    Hello  Nainaiwei Hashimoto
    Hello  HIKARU UTADA
    Hello  Mai Kuraki
    Hello  Nozomi Sasaki
    Hello  Amuro Namie
    Hello   Sarah Brightman
    用时共: 4.0012288093566895 second
    

      

    说明:makeRequests存放的是要开启多线程的函数,以及函数相关参数和回调函数,其中回调函数可以不写(默认是无),也就是说makeRequests只需要2个参数就可以运行。

     

    multiprocessing.dummy(中古时期,python3.2起)

    从python3.2起,可以使用multiprocessing.dummy 创建线程池,注意

    from  multiprocessing  import  Pool  #这里引入的是进程池
    from multiprocessing.dummy import Pool as ThreadPool  #这里引入的才是线程池
    

      

    multiprocessing.dummy模块与multiprocessing模块的区别:dummy模块是多线程,而multiprocessing是多进程,api都是通用的。简单地说,multiprocessing.dummy是multiprocessing多进程模块复制的一个多线程模块,API都是通用的。

    这里的多线程也是受到它受到全局解释器锁(GIL)的限制,并且一次只有一个线程可以执行附加到CPU的操作。

     

    使用介绍

    使用有四种方式:apply_async、apply、map_async、map。

     

    其中apply_async和map_async是异步的,也就是启动进程函数之后会继续执行后续的代码不用等待进程函数返回。apply_async和map_async方式提供了一写获取进程函数状态的函数:ready()successful()get()。

    PS:join()语句要放在close()语句后面。

    具体可以参考

    Python之路(第四十篇)进程池

     

    代码实例


    import  time
    from multiprocessing.dummy import Pool as ThreadPool
    ​
    name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
                 'Amuro Namie',' Sarah Brightman']
    ​
    ​
    def Say_hello(str):
        print("Hello ", str)
        time.sleep(2)
    ​
    ​
    def main():
        start_time = time.time()
        #第一步 创建线程池,线程数为4:
        pool = ThreadPool(4)
        # 第二步用map方法执行
        # pool.map(func, args)  ,注意这里的map方法自带pool.close()和pool.join()方法,等待所有子线程执行完
        pool.map(Say_hello,name_list)
        print('用时共: %s second' % (time.time() - start_time))
    ​
    ​
    if __name__ == '__main__':
        main()
    

      


    输出结果

    Hello  Satomi Ishihara
    Hello  Aragaki Yui
    Hello  Nainaiwei Hashimoto
    Hello  HIKARU UTADA
    Hello  Mai Kuraki
    Hello  Nozomi Sasaki
    Hello  Amuro Namie
    Hello   Sarah Brightman
    用时共: 4.004228830337524 second
    

      

     

    concurrent.futures模块(从python3.2开始自带)

    concurrent.futures模块,可以利用multiprocessing实现真正的平行计算。

    核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。

     

    使用介绍

    模块主要包含下面两个类:

    1. ThreadPoolExecutor

    2. ProcessPoolExecutor

    也就是对 threading 和 multiprocessing 进行了高级别的抽象, 暴露出统一的接口, 方便开发者使用。

    可以使用 ThreadPoolExecutor 来进行多线程编程,ProcessPoolExecutor 进行多进程编程,两者实现了同样的接口,这些接口由抽象类 Executor 定义。 这个模块提供了两大类型,一个是执行器类 Executor,另一个是 Future 类。

    ThreadPoolExecutor:线程池,提供异步调用
    ProcessPoolExecutor: 进程池,提供异步调用

    其实 concurrent.futures 底层还是用的 threading 和 multiprocessing 这两个模块, 相当于在这上面又封装了一层, 所以速度上会慢一点, 这个是架构和接口实现上的取舍造成的。

     

    基类Executor

    #submit(fn, *args, **kwargs)
    异步提交任务
    ​
    #map(func, *iterables, timeout=None, chunksize=1) 
    取代for循环submit的操作,map方法接收两个参数,第一个为要执行的函数,第二个为一个序列,会对序列中的每个元素都执行这个函数,返回值为执行结果组成的生成器
    ​
    #shutdown(wait=True) 
    相当于进程池的pool.close()+pool.join()操作
    wait=True,等待池内所有任务执行完毕回收完资源后才继续
    wait=False,立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前
    可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。
    ​
    ​
    #result(timeout=None)
    取得结果
    submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future能够使用done()方法判断该任务是否结束,done()方法是不阻塞的,使用result()方法可以获取任务的返回值,这个方法是阻塞的。
    ​
    ​
    #add_done_callback(fn)
    回调函数
    ​
    # done()
    判断某一个线程是否完成
    ​
    # cancle()
    取消某个任务
    

      


     

    代码示例

    进程池:

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    ​
    import os,time
    def task(n):
        print('%s is runing进程号' %os.getpid())
        time.sleep(2)
        return n**2
    ​
    ​
    def main():
        start_time = time.time()
        executor=ProcessPoolExecutor(max_workers=3)
    ​
        futures=[]
        for i in range(10):
            future=executor.submit(task,i)  #这里使用submit提交进程
            futures.append(future)
        executor.shutdown(True)
        print('*'*20)
        for future in futures:
            print(future.result())
        print('用时共: %s second' % (time.time() - start_time))
    ​
    if __name__ == '__main__':
        main()
    

      

    输出结果

    10292 is runing进程号
    12516 is runing进程号
    9664 is runing进程号
    10292 is runing进程号
    12516 is runing进程号
    9664 is runing进程号
    10292 is runing进程号
    12516 is runing进程号
    9664 is runing进程号
    10292 is runing进程号
    ********************
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    用时共: 8.176467418670654 second
    

      

    也可以使用map方法提交进程

    示例

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    ​
    import os,time
    def task(n):
        print('%s is runing进程号' %os.getpid())
        time.sleep(2)
        return n**2
    ​
    ​
    def main():
        start_time = time.time()
        with ProcessPoolExecutor(max_workers=3) as executor:
            futures = executor.map(task, [i for i in range(10)])
        print('*'*20)
        for future in futures:
            print(future)  #无需再次使用result()方法获取结果
        print('用时共: %s second' % (time.time() - start_time))
    ​
    if __name__ == '__main__':
        main()
    

      

    输出结果

    1504 is runing进程号
    12644 is runing进程号
    7732 is runing进程号
    1504 is runing进程号
    12644 is runing进程号
    7732 is runing进程号
    1504 is runing进程号
    7732 is runing进程号
    12644 is runing进程号
    1504 is runing进程号
    ********************
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    用时共: 8.171467304229736 second
    

      

     

    分析:map方法返回的results列表是有序的,顺序和*iterables迭代器的顺序一致,这里也无需再次使用result()方法获取结果。

    这里使用with操作符,使得当任务执行完成之后,自动执行shutdown函数,而无需编写相关释放代码。

     

    map()与submit()使用场景

    常用的方法是 submit(), 如果要提交任务的函数是一样的, 就可以简化成 map(), 但是如果提交的函数是不一样的, 或者执行的过程中可能出现异常, 就要使用到 submit(), 因为使用 map() 在执行过程中如果出现异常会直接抛出错误, 而 submit() 则会分开处理。

     

     

     

    线程池

    用法与ProcessPoolExecutor相同

     

    示例

     

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    ​
    import os,time
    def task(n):
        print('%s is runing进程号' %os.getpid())
        time.sleep(2)
        return n**2
    ​
    ​
    def main():
        start_time = time.time()
        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = executor.map(task, [i for i in range(10)])
        print('*'*20)
        for future in futures:
            print(future)
        print('用时共: %s second' % (time.time() - start_time))
    ​
    if __name__ == '__main__':
        main()
    

      

    输出结果

    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    7976 is runing进程号
    ********************
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    用时共: 8.001457929611206 second
    

      

    分析:注意所有的进程号都是一样的,这里是开启的多线程,所以进程号是一样的

    示例二


    import  time
    from concurrent.futures import ThreadPoolExecutor
    ​
    name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
                 'Amuro Namie',' Sarah Brightman']
    ​
    ​
    def say_hello(str):
        print("Hello ", str)
        time.sleep(2)
    ​
    ​
    def main():
        start_time = time.time()
        # 用map方法执行
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = executor.map(say_hello,name_list)
        print('用时共: %s second' % (time.time() - start_time))
    ​
    ​
    if __name__ == '__main__':
        main()
    ​
    

      

     

    输出结果

    Hello  Satomi Ishihara
    Hello  Aragaki Yui
    Hello  Nainaiwei Hashimoto
    Hello  HIKARU UTADA
    Hello  Mai Kuraki
    Hello  Nozomi Sasaki
    Hello  Amuro Namie
    Hello   Sarah Brightman
    用时共: 4.0022289752960205 second
    

      回调函数的使用

    import  time
    from concurrent.futures import ThreadPoolExecutor
    
    name_list = ['Satomi Ishihara', 'Aragaki Yui', 'Nainaiwei Hashimoto', 'HIKARU UTADA', 'Mai Kuraki', 'Nozomi Sasaki',
                 'Amuro Namie',' Sarah Brightman']
    
    
    def say_hello(str):
        print("Hello ", str)
        time.sleep(2)
        return str
    
    
    
    def call_back(res):
        res = res.result()  #获取结果
        print(res,"长度是%s"%len(res))
    
    
    
    def main():
        start_time = time.time()
        # 用submit方法执行
        executor=ThreadPoolExecutor(max_workers=4)
    
        for i in name_list:
            executor.submit(say_hello,i).add_done_callback(call_back)
        executor.shutdown(True)
        #这里使用submit提交线程,使用add_done_callback()添加回调函数
        print('用时共: %s second' % (time.time() - start_time))
    
    
    if __name__ == '__main__':
        main()
    

      输出结果

    Hello  Satomi Ishihara
    Hello  Aragaki Yui
    Hello  Nainaiwei Hashimoto
    Hello  HIKARU UTADA
    HIKARU UTADA 长度是12
    Aragaki Yui 长度是11
    Hello  Mai Kuraki
    Hello  Nozomi Sasaki
    Satomi Ishihara 长度是15
    Nainaiwei Hashimoto 长度是19
    Hello  Amuro Namie
    Hello   Sarah Brightman
    Nozomi Sasaki 长度是13
    Amuro Namie 长度是11
     Sarah Brightman 长度是16
    Mai Kuraki 长度是10
    用时共: 4.0022289752960205 second
    

      

     

  • 相关阅读:
    Camera
    iOS实现截屏 并合适保存
    将UIView转成UIImage,将UIImage转成PNG/JPG
    iOS7Status bar适配
    @synthesize obj=_obj的意义详解 @property和@synthesize
    iOS各种问题处理
    Foundation框架中的NSNumber对象详解
    iOS 文件和数据管理 (可能会删除本地文件储存)
    当ABAP遇见普罗米修斯
    一个工作13年的SAP开发人员的回忆:电子科技大学2000级新生入学指南
  • 原文地址:https://www.cnblogs.com/Nicholas0707/p/11768462.html
Copyright © 2011-2022 走看看