zoukankan      html  css  js  c++  java
  • Python中的进程池与线程池(包含代码)

    Python中的进程池与线程池

    • 引入进程池与线程池

    • 使用ProcessPoolExecutor进程池,使用ThreadPoolExecutor

    • 使用shutdown

    • 使用submit同步调用

    • 使用submit异步调用

    • 异步+回调函数

    • 并发实现套接字通信

    引入进程池

    在学习线程池之前,我们先看一个例子

    复制代码
     1 # from multiprocessing import Process
     2 # import time
     3 #
     4 # def task(name):
     5 #     print('name',name)
     6 #     time.sleep(1)
     7 # if __name__ == '__main__':
     8 #     start=time.time()
     9 #     p1 = Process(target=task,args=("safly1",))
    10 #     p2 = Process(target=task, args=("safly2",))
    11 #     p3 = Process(target=task, args=("safly3",))
    12 #
    13 #     p1.start()
    14 #     p2.start()
    15 #     p3.start()
    16 #
    17 #     p1.join()
    18 #     p2.join()
    19 #     p3.join()
    20 #
    21 #     print("main")
    22 #
    23 #     end = time.time()
    24 #     print(end- start)
    复制代码

    输出如下:

     以上的方式是一个个创建进程,这样的耗费时间才1秒多,虽然高效,但是有什么弊端呢? 
    如果并发很大的话,会给服务器带来很大的压力,所以引入了进程池的概念

    使用ProcessPoolExecutor进程池

    什么时候用池:
    池的功能是限制启动的进程数或线程数,
    什么时候应该限制???
    当并发的任务数远远超过了计算机的承受能力时,即无法一次性开启过多的进程数或线程数时
    就应该用池的概念将开启的进程数或线程数限制在计算机可承受的范围内

    Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对编写线程池/进程池提供了直接的支持。

    通过ProcessPoolExecutor 来做示例。 
    我们来看一个最简单的进程池

    复制代码
     1 from concurrent.futures import ProcessPoolExecutor
     2 import time
     3 def task(name):
     4     print('name',name)
     5     time.sleep(1)
     6 if __name__ == '__main__':
     7     start=time.time()
     8     p1=ProcessPoolExecutor(2)
     9     for i in range(5):
    10         p1.submit(task,i)
    11     p1.shutdown(wait=True)
    12     print('')
    13     end=time.time()
    14     print(end-start)
    复制代码

    输出如下:

    复制代码
     1 D:APPSPython3.7python.exe "D:/Python/project one/day20180717/进程池与线程池.py"
     2 name 0
     3 name 1
     4 name 2
     5 name 3
     6 name 4
     7  8 3.118098258972168
     9 
    10 Process finished with exit code 0
    复制代码

    简单解释下: 
    ProcessPoolExecutor(2)创建一个进程池,容量为2,循环submit出5个进程,然后就在线程池队列里面,执行多个进程,p1.shutdown(wait=True)意思是进程都执行完毕,在执行主进程的内容

    使用shutdown

    p1.shutdown(wait=True)是进程池内部的进程都执行完毕,才会关闭,然后执行后续代码 
    如果改成false呢?看如下代码

    复制代码
     1 from concurrent.futures import ProcessPoolExecutor
     2 import time
     3 def task(name):
     4     print('name',name)
     5     time.sleep(1)
     6 if __name__ == '__main__':
     7     start=time.time()
     8     p1=ProcessPoolExecutor(2)
     9     for i in range(5):
    10         p1.submit(task,i)
    11     p1.shutdown(wait=False)
    12     print('')
    13     end=time.time()
    14     print(end-start)
    复制代码

    输出如下:

    复制代码
     1 D:APPSPython3.7python.exe "D:/Python/project one/day20180717/进程池与线程池.py"
     2  3 0.008975744247436523
     4 name 0
     5 name 1
     6 name 2
     7 name 3
     8 name 4
     9 
    10 Process finished with exit code 0
    复制代码

    使用submit同步调用

    同步:提交完任务后就在原地等待,直到任务运行完毕并且拿到返回值后,才运行下一行代码

    复制代码
    from concurrent.futures import ProcessPoolExecutor
    import time, random, os
    
    def piao(name, n):
        print('%s is piaoing %s' % (name, os.getpid()))
        time.sleep(1)
        return n ** 2
    
    
    if __name__ == '__main__':
        p = ProcessPoolExecutor(2)
        start = time.time()
        for i in range(5):
            res=p.submit(piao,'safly %s' %i,i).result() #同步调用
            print(res)
    
        p.shutdown(wait=True)
        print('', os.getpid())
    
        stop = time.time()
        print(stop - start)
    复制代码
    复制代码
     1 D:APPSPython3.7python.exe "D:/Python/project one/day20180717/进程池与线程池.py"
     2 safly 0 is piaoing 11448
     3 0
     4 safly 1 is piaoing 11800
     5 1
     6 safly 2 is piaoing 11448
     7 4
     8 safly 3 is piaoing 11800
     9 9
    10 safly 4 is piaoing 11448
    11 16
    12 主 8516
    13 5.095325946807861
    14 
    15 Process finished with exit code 0
    复制代码

    使用submit异步调用

    异步:提交完任务(绑定一个回调函数)后不原地等待,直接运行下一行代码,等到任务运行有返回值自动触发回调的函数的运行

    复制代码
     1 from concurrent.futures import ThreadPoolExecutor
     2 import time
     3 def task(name):
     4     print('name',name)
     5     time.sleep(1)
     6 if __name__ == '__main__':
     7     start=time.time()
     8     p1=ThreadPoolExecutor(2)
     9     for i in range(5):
    10         p1.submit(task,i)
    11     p1.shutdown(wait=True)
    12     print('')
    13     end=time.time()
    14     print(end-start)
    复制代码
    复制代码
    1 D:APPSPython3.7python.exe "D:/Python/project one/day20180717/进程池与线程池.py"
    2 name 0
    3 name 1
    4 name 2
    5 name 3
    6 name 4
    7 8 3.003053903579712
    复制代码

    使用回调函数+异步

    进程
    复制代码
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # import os
    # import time
    # import random
    #
    # def task(n):
    #     print('%s run...' %os.getpid())
    #     time.sleep(5)
    #     return n**2
    #
    # def parse(future):
    #     time.sleep(1)
    #     res=future.result()
    #     print('%s 处理了 %s' %(os.getpid(),res))
    #
    # if __name__ == '__main__':
    #     pool=ProcessPoolExecutor(4)
    #     # pool.submit(task,1)
    #     # pool.submit(task,2)
    #     # pool.submit(task,3)
    #     # pool.submit(task,4)
    #
    #     start=time.time()
    #     for i in range(1,5):
    #         future=pool.submit(task,i)
    #         future.add_done_callback(parse) # parse会在futrue有返回值时立刻触发,并且将future当作参数传给parse
    #     pool.shutdown(wait=True)
    #     stop=time.time()
    #     print('主',os.getpid(),(stop - start))
    复制代码
    复制代码
     1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
     2 from threading import current_thread
     3 import os
     4 import time
     5 import random
     6 
     7 def task(n):
     8     print('%s run...' %current_thread().name)
     9     time.sleep(5)
    10     return n**2
    11 
    12 def parse(future):
    13     time.sleep(1)
    14     res=future.result()
    15     print('%s 处理了 %s' %(current_thread().name,res))
    16 
    17 if __name__ == '__main__':
    18     pool=ThreadPoolExecutor(4)
    19     start=time.time()
    20     for i in range(1,5):
    21         future=pool.submit(task,i)
    22         future.add_done_callback(parse) # parse会在futrue有返回值时立刻触发,并且将future当作参数传给parse
    23     pool.shutdown(wait=True)
    24     stop=time.time()
    25     print('',current_thread().name,(stop - start))
    复制代码

    并发实现套接字通信

    服务端
    客户端

    扩展:

    回调函数(callback)是什么?

    以下均来自知乎:

  • 相关阅读:
    2020-04-07 python一行代码 http服务器文件共享
    2020-04-06 linux命令之awk
    2020-04-05 ubuntu安装docker并使用国内加速
    2020-04-04 ssh免密登录
    尚学堂 JAVA DAY11 概念总结
    尚学堂 JAVA Day3 概念总结
    尚学堂 JAVA Day1 概念总结
    Android Studio 首次安装报错 Java.lang.RuntimeException:java.lang.NullPointerException...错
    Android 迷之Version管理
    Android Develop 之 Ddevelop WorkFlow Basics
  • 原文地址:https://www.cnblogs.com/16795079a/p/10950337.html
Copyright © 2011-2022 走看看