zoukankan      html  css  js  c++  java
  • (转)Python多进程Process、Pool的使用总结

    原文:https://www.cnblogs.com/wangdac/p/13892208.html

    python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。

    进程池Pool模块接口说明

    拆分任务一提高执行效率时,独立多进程apply_async(),通过列表解析添加子任务,是最优选择。, 见pool部分

    multiprocessing.Pool类接口详解

    join方法的意义

    join()方法可以等待子进程结束后再继续往下运行(更准确地说,在当前位置阻塞主进程,带执行join()的进程结束后再继续执行主进程),通常用于进程间的同步。(进一步地解释,哪个子进程调用了join方法,主进程就要等该子进程执行完后才能继续向下执行,具体可见下边的分析图)

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
    Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    case:使用进程池(非阻塞)
    #coding: utf-8
    import multiprocessing
    import time
    
    def func(msg):
        print "msg:", msg
        time.sleep(3)
        print "end"
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes = 3)
        for i in xrange(4):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    
        print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
        pool.close()
        pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
        print "Sub-process(es) done."
    
    '''    
    函数解释:
    
    apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
    close()    关闭pool,使其不在接受新的任务。
    terminate()    结束工作进程,不在处理未完成的任务。
    join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
    执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。    
    pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
    '''
    

    Python多进程pool.map()函数,有两个参数可以传,第一个参数传的是函数,第二个参数传的是数据列表。那么怎么在第二个数据列表,多传几个参数呢,方法是通过对有多个参数的方法进行封装,在进程中运行封装后的方法。

    pool.map

    # -*- coding:utf-8 -*-
    
    import time
    import multiprocessing
    
    def job(x ,y):
    	"""
    	:param x:
    	:param y:
    	:return:
    	"""
    	return x * y
    
    def job1(z):
    	"""
    	:param z:
    	:return:
    	"""
    	return job(z[0], z[1])
    
    if __name__ == "__main__":
    	time1=time.time()
    	pool = multiprocessing.Pool(2)
    	data_list=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)]
    	res = pool.map(job1,data_list)
    	time2=time.time()
    	print(res)
    	pool.close()
    	pool.join()
    	print('总共耗时:' + str(time2 - time1) + 's')
    

    pool.apply_async

    case1 : https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/pooled_processing.py
    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    from multiprocessing import Pool
    import os, time, random
    
    def long_time_task(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        time.sleep(random.random() * 3)
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    if __name__=='__main__':
        print('Parent process %s.' % os.getpid())
        p = Pool(4)
        for i in range(5):
            p.apply_async(long_time_task, args=(i,))
        print('Waiting for all subprocesses done...')
        p.close()
        p.join()
        print('All subprocesses done.')
    
    case2: https://blog.csdn.net/jinping_shi/article/details/52433867
    import multiprocessing
    import time
    
    def func(msg):
        print multiprocessing.current_process().name + '-' + msg
    
    if __name__ == "__main__":
        pool = multiprocessing.Pool(processes=4) # 创建4个进程
        for i in xrange(10):
            msg = "hello %d" %(i)
            pool.apply_async(func, (msg, ))
        pool.close() # 关闭进程池,表示不能在往进程池中添加进程
        pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用
        print "Sub-process(es) done."
    
    case3:https://blog.csdn.net/weixin_42898819/article/details/81811514
    from multiprocessing import Process,Pool  #导入进程池
    import time,os
    
    def Foo(i):
        time.sleep(2)
        print('到了2s')
        return i+100
    def Bar(arg):
        print('结果:',arg)
    
    if __name__  == '__main__':  
        pool=Pool(processes= 5)  #允许进程池同时放入5个进程
    
        for i in range(10):  #10个进程都启动 但是一次只能运行5个
            #pool.apply(func= Foo,args=(i,))  #串行执行进程,一次执行1个进程
            pool.apply_async(func= Foo,args=(i,),callback= Bar) #并行执行进程,一次5个,callback回调 Foo执行完就会执行Bar
        print('end')
        pool.close()
        pool.join() #等待进程池中的进程执行完毕  必须先close()  在join()
    

    什么时候用进程池Pool

    当我们需要的进程数量不多的时候,我们可以使用multiprocessing的Process类来创建进程。但是如果我们需要的进程特别多的时候,手动创建工作量太大了,所以Python也为我们提供了Pool(池)的方式来创建大量进程。

    from multiprocessing import Pool
    import os,time
    
    def run(msg):
        print("开始一个子线程运行了……")
        time.sleep(1)
        print("开始一个子线程运行结束了……")
    
    if __name__ == "__main__":
        pool = Pool(3)  # 表示初始化一个进程池,最大进程数为5
        for x in range(10):
            pool.apply_async(run, args=("hello pool",))
        print("------start----")
        pool.close() # 关闭池
        pool.join() # 等待所有的子进程完成,必须放在close后面
        print("-------end------")
        
    '''
    注意:一般我们使用apply_async这个方法,表示非阻塞的运行,一旦使用了apply方法表示阻塞式执行任务,此时就是单任务执行了(一般不会使用,特殊场景才会使用)
    '''    
    

    Pool.map多参数任务

    map的多参数解决办法

    #也可以用List将多个参数拼接成一个argList元组,然后多个argList再组合为pool.map要求的可迭代对象
    
    def job(x ,y):
    	return x * y
    
    def job1(z):
        return job(z[0], z[1])
    
    if __name__ == "__main__":
    	pool = multiprocessing.Pool()
    	res = pool.map(job1, [(2, 3), (3, 4)])
    	print res
    
    # 将多个输入变量打包到一个参数
    x = [1,2,3,4,5,6]
    y = [1,1,1,1,1,1]
    x_y = zip(x, y)
    results = pool.map(work, x_y)
    
    #使用pathos包下的multiprocessing
    这个包是使用dill的multiprocessing的一个fork,允许多参数输入:
    
    from pathos.multiprocessing import ProcessingPoll as Pool
    pool = Pool(4)
    results = pool.map(work, x, y)
    

    Pool.apply_async()有序输出多个迭代结果

    在使用apply_async()方法接收多个参数的方法时,在任务方法中正常定义多个参数,参数以元组形式传入即可
    但是给apply_async()方法传入多个值获取多个迭代结果时就会报错,因为该方法只能接收一个值,所以可以将该方法放入一个列表生成式中,如下

    def job(x):
        return x * x
    
    if __name__ == "__main__":
        pool multiprocessing.Pool()
        res = [pool.apply_async(target=job, (i,)) for i in range(3)]
        print [r.get() for r in res]
    

    进程池 apply_async, map方法示例

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。就是固定有几个进程可以使用。

    进程池中有两个方法:

    apply:同步,一般不使用

    apply_async:异步

    from  multiprocessing import Process,Pool
    import os, time, random
    
    def fun1(name):
        print('Run task %s (%s)...' % (name, os.getpid()))
        start = time.time()
        time.sleep(random.random() * 3)
        end = time.time()
        print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    
    if __name__=='__main__':
        pool = Pool(5) #创建一个5个进程的进程池
    
        for i in range(10):
            pool.apply_async(func=fun1, args=(i,))
    
        pool.close()
        pool.join()
        print('结束测试')
    

    结果

    Run task 0 (37476)...
    Run task 1 (4044)...
    Task 0 runs 0.03 seconds.
    Run task 2 (37476)...
    Run task 3 (17252)...
    Run task 4 (16448)...
    Run task 5 (24804)...
    Task 2 runs 0.27 seconds.
    Run task 6 (37476)...
    Task 1 runs 0.58 seconds.
    Run task 7 (4044)...
    Task 3 runs 0.98 seconds.
    Run task 8 (17252)...
    Task 5 runs 1.13 seconds.
    Run task 9 (24804)...
    Task 6 runs 1.46 seconds.
    Task 4 runs 2.73 seconds.
    Task 8 runs 2.18 seconds.
    Task 7 runs 2.93 seconds.
    Task 9 runs 2.93 seconds.
    结束测试
    

    Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

    进程池map方法

    因为网上看到这个例子觉得不错,所以这里就不自己写案例,这个案例比较有说服力

    import os 
    import PIL 
    
    from multiprocessing import Pool 
    from PIL import Image
    
    SIZE = (75,75)
    SAVE_DIRECTORY = \'thumbs\'
    
    def get_image_paths(folder):
        return (os.path.join(folder, f) 
                for f in os.listdir(folder) 
                if \'jpeg\' in f)
    
    def create_thumbnail(filename): 
        im = Image.open(filename)
        im.thumbnail(SIZE, Image.ANTIALIAS)
        base, fname = os.path.split(filename) 
        save_path = os.path.join(base, SAVE_DIRECTORY, fname)
        im.save(save_path)
    
    if __name__ == \'__main__\':
        folder = os.path.abspath(
            \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
        os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
    
        images = get_image_paths(folder)
    
        pool = Pool()
        pool.map(creat_thumbnail, images) #关键点,images是一个可迭代对象
        pool.close()
        pool.join()
    
    复制

    上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

    map在爬虫的领域里也可以使用,比如多个URL的内容爬取,可以把URL放入元祖里,然后传给执行函数。

    技术链接
  • 相关阅读:
    JAVA 问题
    WebStrom配置多个项目的Dweployment时,设置默认的启动配置
    C#中有关数组和string引用类型或值类型的判断
    Delegate(代理)异常:该委托必须有一个目标
    RMAN BACKUP
    Oracle ORA-01033: 错误解决办法
    微信公众号开发 接口配置信息 配置失败
    使用JAVA开发微信公众平台(一)——环境搭建与开发接入
    微信开发准备(四)--nat123内网地址公网映射实现
    nat123安装启动教程帮助
  • 原文地址:https://www.cnblogs.com/liujiacai/p/15732679.html
Copyright © 2011-2022 走看看