zoukankan      html  css  js  c++  java
  • python进程池:multiprocessing.pool

    本文转至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基础上进行了一些小小改动。

    在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
    Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    例1:使用进程池

    from multiprocessing import freeze_support,Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        print('___time---',time.ctime())
        return i+100
    
    def Bar(arg):
        print('----exec done:',arg,time.ctime())
    
    if __name__ == '__main__':
        freeze_support()
        pool = Pool(3) #线程池中的同时执行的进程数为3
    
        for i in range(4):
            pool.apply_async(func=Foo,args=(i,),callback=Bar) #线程池中的同时执行的进程数为3,当一个进程执行完毕后,如果还有新进程等待执行,则会将其添加进去
            # pool.apply(func=Foo,args=(i,))
    
        print('end')
        pool.close()
        pool.join()#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    执行结果:

    复制代码
    end
    ___time--- Thu Jun 16 15:11:45 2016
    ----exec done: 100 Thu Jun 16 15:11:45 2016
    ___time--- Thu Jun 16 15:11:45 2016
    ----exec done: 101 Thu Jun 16 15:11:45 2016
    ___time--- Thu Jun 16 15:11:45 2016
    ----exec done: 102 Thu Jun 16 15:11:45 2016
    ___time--- Thu Jun 16 15:11:47 2016
    ----exec done: 103 Thu Jun 16 15:11:47 2016
    复制代码

    函数解释

    • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
    • close()    关闭pool,使其不在接受新的任务。
    • terminate()    结束工作进程,不在处理未完成的任务。
    • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

    执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

    例2:使用进程池(阻塞)

    from multiprocessing import freeze_support,Pool
    import time
    
    def Foo(i):
        time.sleep(2)
        print('___time---',time.ctime())
        return i+100
    
    def Bar(arg):
        print('----exec done:',arg,time.ctime())
    
    if __name__ == '__main__':
        freeze_support()
        pool = Pool(3) #线程池中的同时执行的进程数为3
    
        for i in range(4):
            pool.apply(func=Foo,args=(i,))
    
        print('end')
        pool.close()
        pool.join()#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    执行结果

    ___time--- Thu Jun 16 15:15:16 2016
    ___time--- Thu Jun 16 15:15:18 2016
    ___time--- Thu Jun 16 15:15:20 2016
    ___time--- Thu Jun 16 15:15:22 2016
    end

    例3:使用进程池,并关注结果

    import multiprocessing
    import time
    
    def func(msg):
        print('hello :',msg,time.ctime())
        time.sleep(2)
        print('end',time.ctime())
        return 'done' + msg
    
    if __name__=='__main__':
        pool = multiprocessing.Pool(2)
        result = []
        for i in range(3):
            msg = 'hello %s' %i
            result.append(pool.apply_async(func=func,args=(msg,)))
    
        pool.close()
        pool.join()
    
        for res in result:
            print('***:',res.get())
    
        print('AAAAAAAAll end--')

    执行结果

    复制代码
    
    

    hello : hello 0 Thu Jun 16 15:26:33 2016
    hello : hello 1 Thu Jun 16 15:26:33 2016
    end Thu Jun 16 15:26:35 2016
    hello : hello 2 Thu Jun 16 15:26:35 2016
    end Thu Jun 16 15:26:35 2016
    end Thu Jun 16 15:26:37 2016
    ***: donehello 0
    ***: donehello 1
    ***: donehello 2
    AAAAAAAAll end--

    复制代码

     :get()函数得出每个返回结果的值

    例4:使用多个进程池

    import multiprocessing
    import time,os,random
    
    def Lee():
        print('
    Run task Lee--%s******ppid:%s'%(os.getpid(),os.getppid()),'~~~~',time.ctime())
        start = time.time()
        time.sleep(random.randrange(10))
        end = time.time()
        print('Task Lee,runs %0.2f seconds.'%(end-start),'~~~~',time.ctime())
    
    def Marlon():
        print("
    Run task Marlon-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
        start = time.time()
        time.sleep(random.random() * 40)
        end=time.time()
        print( 'Task Marlon runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
    
    def Allen():
        print( "
    Run task Allen-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
        start = time.time()
        time.sleep(random.random() * 30)
        end = time.time()
        print( 'Task Allen runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
    
    def Frank():
        print( "
    Run task Frank-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
        start = time.time()
        time.sleep(random.random() * 20)
        end = time.time()
        print( 'Task Frank runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
    
    if __name__ == '__main__':
        func_list = [Lee,Marlon,Allen,Frank]
        print('parent process id %s'%os.getpid())
    
        pool = multiprocessing.Pool(4)
        for func in func_list:
            pool.apply_async(func)  #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
    
        print( 'Waiting for all subprocesses done...')
        pool.close()
        pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
        print ('All subprocesses done.')

    执行结果

    复制代码
    parent process id 98552
    Waiting for all subprocesses done...
    
    Run task Lee--97316******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
    
    Run task Marlon-95536******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
    
    Run task Allen-95720******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
    
    Run task Frank-98784******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
    Task Allen runs 0.31 seconds. ~~~~ Thu Jun 16 15:20:51 2016
    Task Lee,runs 7.00 seconds. ~~~~ Thu Jun 16 15:20:57 2016
    Task Frank runs 14.48 seconds. ~~~~ Thu Jun 16 15:21:05 2016
    Task Marlon runs 31.72 seconds. ~~~~ Thu Jun 16 15:21:22 2016
    All subprocesses done.
    复制代码

    multiprocessing pool map

    复制代码
    复制代码
    #coding: utf-8
    import multiprocessing 
    
    def m1(x): 
        print x * x 
    
    if __name__ == '__main__': 
        pool = multiprocessing.Pool(multiprocessing.cpu_count()) 
        i_list = range(8)
        pool.map(m1, i_list)
    复制代码
    复制代码

    一次执行结果

    0
    1
    4
    9
    16
    25
    36
    49

    参考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx 

    问题:http://bbs.chinaunix.net/thread-4111379-1-1.html

    复制代码
    复制代码
    #coding: utf-8
    import multiprocessing
    import logging
    
    def create_logger(i):
        print i
    
    class CreateLogger(object):
        def __init__(self, func):
            self.func = func
    
    if __name__ == '__main__':
        ilist = range(10)
    
        cl = CreateLogger(create_logger)
        pool = multiprocessing.Pool(multiprocessing.cpu_count())
        pool.map(cl.func, ilist)
    
        print "hello------------>"
    复制代码
    复制代码

    一次执行结果

    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    hello------------>

  • 相关阅读:
    navicat 连接Oracle 报错 ORA-12514 TNS:listener does not currently know of service requested in connect descriptor
    centos 安装配置 redis
    ubuntu安装keras
    VB类似的InputBox为MFC
    WPF颜色选择器
    窗口样式
    IP地址、端口号、子网掩码提交表单库
    添加自定义对话框,您的应用程序
    在c#中使用异步等待构建响应式UI
    一个通用对话框
  • 原文地址:https://www.cnblogs.com/hankleo/p/10575965.html
Copyright © 2011-2022 走看看