zoukankan      html  css  js  c++  java
  • Python多进程并发操作中进程池Pool的应用

      在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
          Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,

    那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,

    那么该请求就会等待,直到池中有进程结束,

    才会创建新的进程来它。这里有一个简单的例子:

    #!/usr/bin/env python
    #coding=utf-8
    
    
    from multiprocessing import Pool
    from time import sleep
    
    def f(x):
        for i in range(10):
            print '%s --- %s ' % (i, x)
            sleep(1)
    
    
    def main():
        pool = Pool(processes=3)    # set the processes max number 3
        for i in range(11,20):
            result = pool.apply_async(f, (i,))
        pool.close()
        pool.join()
        if result.successful():
            print 'successful'
    
    
    if __name__ == "__main__":
        main()
    

    先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况,会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求,pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但必pool.join()必须使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。
        利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低.

    python中multiprocessing.pool函数介绍

    apply(func[, args[, kwds]])
       apply用于传递不定参数,同python中的apply函数一致(不过内置的apply函数从2.3以后就不建议使用了),主进程会阻塞于函数。
    for x in gen_list(l):
        result = pool.apply(pool_test, (x,))
        print 'main process'
    这个时候主进程的执行流程同单进程一致
    apply_async(func[, args[, kwds[, callback]]])
       与apply用法一致,但它是非阻塞的且支持结果返回后进行回调。
    for x in gen_list(l):
        result = pool.apply_async(pool_test, (x,))
        print 'main process'
       这个时候主进程循环运行过程中不等待apply_async的返回结果,在主进程结束后,即使子进程还未返回整个程序也会就退出。虽然 apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,在本例中result.get()会阻塞主进程。因此可以这样来处理返回结果:
        [x.get() for x in [pool.apply_async(pool_test, (x,)) for x in gen_list(l)]]
    如果我们对返回结果不感兴趣, 那么可以在主进程中使用pool.close与pool.join来防止主进程退出。注意join方法一定要在close或terminate之后调用。
        for x in gen_list(l):
        pool.apply_async(pool_test, (x, ))
        print 'main_process'
        pool.close()
        pool.join()
    map(func, iterable[, chunksize])
       map方法与内置的map函数行为基本一致,在它会使进程阻塞与此直到结果返回。
       但需注意的是其第二个参数虽然描述的为iterable, 但在实际使用中发现只有在整个队列全部就绪后,程序才会运行子进程。
    map_async(func, iterable[, chunksize[, callback]])
       与map用法一致,但是它是非阻塞的。其有关事项见apply_async。
    imap(func, iterable[, chunksize])
       与map不同的是, imap的返回结果为iter,需要在主进程中主动使用next来驱动子进程的调用。即使子进程没有返回结果,主进程对于gen_list(l)的 iter还是会继续进行, 另外根据python2.6文档的描述,对于大数据量的iterable而言,将chunksize设置大一些比默认的1要好。
       for x in pool.imap(pool_test, gen_list(l)):
           pass
    imap_unordered(func, iterable[, chunksize])
       同imap一致,只不过其并不保证返回结果与迭代传入的顺序一致。
    close()
       关闭pool,使其不在接受新的任务。
    terminate()
       结束工作进程,不在处理未处理的任务。
    join()
       主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用。

    l = range(10)
    def gen_list(l):
        for x in l:
            print 'yield', x
            yield x

    def pool_test(x):
        print 'f2', x
        time.sleep(1)

  • 相关阅读:
    springmvc log4j 配置
    intellij idea maven springmvc 环境搭建
    spring,property not found on type
    intellij idea maven 工程生成可执行的jar
    device eth0 does not seem to be present, delaying initialization
    macos ssh host配置及免密登陆
    centos7 搭建 docker 环境
    通过rest接口获取自增id (twitter snowflake算法)
    微信小程序开发体验
    gitbook 制作 beego 参考手册
  • 原文地址:https://www.cnblogs.com/alan-babyblog/p/5351031.html
Copyright © 2011-2022 走看看