zoukankan      html  css  js  c++  java
  • 进程池、进程池和多进程的性能测试、进程池的其他机制、进程池的回调函数

    进程池

    如果有多少个任务,就开启多少个进程,实际上并不划算
    由于计算机的cpu个数是非常有限的
    因此开启的进程数量完全和cpu个数成比例

    # 没有时间延迟时
    
    from multiprocessing import Pool
    import os
    
    def func(i):
        print(i, os.getpid())
    
    if __name__ == "__main__":
        p = Pool(4)
        for i in range(10):
            p.apply_async(func, args=(i,))  # async异步的提交任务
        p.close()  # 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
        p.join()  # 阻塞,直到池子中的任务都执行完毕
    
    # 0 7352
    # 1 7352
    # 2 7352
    # 3 7352
    # 4 7352
    # 5 7352
    # 6 7352
    # 7 7352
    # 8 7352
    # 9 7352
    
    # 如果没有时间延迟,那么结果显示只有一个进程池
    
    
    
    # 设置时间延迟时
    
    from multiprocessing import Pool
    import os
    import time
    
    def func(i):
        time.sleep(0.1)
        print(i, os.getpid())
    
    if __name__ == "__main__":
        p = Pool(4)  # 一般设置成cpu的个数+1
        for i in range(10):
            p.apply_async(func, args=(i,))  # async异步的提交任务
        p.close()  # 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
        p.join()  # 阻塞,直到池子中的任务都执行完毕
    
    # 0 9364
    # 1 2664
    # 2 11456
    # 3 7312
    # 4 9364
    # 5 2664
    # 6 11456
    # 7 7312
    # 8 9364
    # 9 2664
    
    # 运行结果发现就4个进程池反复进行
    # 起多进程的意义
    #   1.为了更好的利用cpu,所以程序中都是网络IO或文件IO就不适合用多进程
    #       比如多人聊天,如果1000人同时聊天,服务器不可能开启1000个进程池,不然挂掉
    #   2.为了数据的隔离,如果程序中总是要数据共享,那么就不适合使用多进程
    #   3.超过了cpu个数的任务数,都应该使用进程池来解决问题,而不能无限开启子进程
    
    # 进程池和多进程的性能测试
    
    import os
    import time
    from multiprocessing import Process, Pool
    
    def func(i):
        print(i, os.getpid())
    
    if __name__ == "__main__":
        start = time.time()
        p_lst = []
        for i in range(5):
            p = Process(target=func, args=(i,))
            p.start()
            p_lst.append(p)
        for p in p_lst:
            p.join()
        end = time.time()
        pro_time = end - start
        start = time.time()
        p = Pool(4)
        for i in range(5):
            p.apply_async(func, args=(i,))  # async异步的提交任务
        p.close()  # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务
        p.join()  # 阻塞,直到池子中的任务都执行完毕
        end = time.time()
        pool_time = end - start
        print(pro_time, pool_time)
    
    # 0 7280
    # 1 12636
    # 2 14180
    # 3 8772
    # 4 4312
    # 0 3120
    # 1 3120
    # 2 3120
    # 3 3120
    # 4 3120
    # 0.37152719497680664 0.458310604095459
    # 进程池的其他机制
    
    import os
    import time
    from multiprocessing import Pool
    
    def func(i):
        time.sleep(0.1)
        print(i,os.getpid())
    
    if __name__ == '__main__':
        p = Pool(4)   # cpu个数 + 1/cpu的个数
        p.map(func,range(5))
    
    # 0 3104
    # 1 12028
    # 2 7620
    # 3 692
    # 4 3104
    # 因此可以把上一个的for循环换成这个map()
    
    

    # 进程池里面可以使用get()得到func()的返回值 import os import time from multiprocessing import Pool def func(i): time.sleep(1) print(i,os.getpid()) return i**i if __name__ == '__main__': p = Pool(4) # cpu个数 + 1/cpu的个数 ret_lst = [] for i in range(10): ret = p.apply_async(func,args=(i,)) # async异步的提交任务 ret_lst.append(ret) p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务 p.join() # 阻塞,直到池子中的任务都执行完毕 for ret in ret_lst: print(ret.get()) import os import time from multiprocessing import Pool def func(i): time.sleep(0.1) print(i,os.getpid()) return i ** i if __name__ == '__main__': p = Pool(4) # cpu个数 + 1/cpu的个数 ret = p.map(func,range(10)) for r in ret: print(r)
    # 进程池的回调函数
    
    import time
    import random
    from multiprocessing import Process,Pool
    def get(i):    # 进程池的子进程执行的
        time.sleep(random.random())
        print('从网页获取一个网页的内容', i)
        return i,'网页的内容'*i
    
    def call_back(content):   # 主进程执行的
        print(content)
    
    if __name__ == '__main__':
        # p = Process(target=get)
        # p.start()
        p = Pool(5)
        ret_l = []
        # for i in range(10):
        #     ret = p.apply_async(get,args=(i,))
        #     ret_l.append(ret)
        # for ret in ret_l:
        #     content = ret.get()
        #     print(len(content))
        for i in range(10):
            p.apply_async(get,args=(i,),callback=call_back)
        p.close()
        p.join()
    
    
    # 将n个任务交给n个进程去执行
    # 每一个进程在执行完毕之后会有一个返回值,这个返回值会直接交给callback参数指定的那个函数去进行处理
    # 这样的话 所有的进程 哪一个执行的最快,哪一个就可以先进性统计工作
    # 能在最短的时间内得到结果
  • 相关阅读:
    大数据技术之_16_Scala学习_04_函数式编程-基础+面向对象编程-基础
    大数据技术之_16_Scala学习_03_运算符+程序流程控制
    大数据技术之_16_Scala学习_01_Scala 语言概述
    通过创建一个简单的骰子游戏来探究 Python
    在Linux系统中创建SSH服务器别名
    DNS原理及劫持问题
    详细介绍:Kubernetes1.4版本的新功能
    Linux系统中五款好用的日志分析工具
    wc命令——Linux系统高效数据统计工具
    Linux系统内核正式进入5.0版本时代
  • 原文地址:https://www.cnblogs.com/shawnhuang/p/10323728.html
Copyright © 2011-2022 走看看