zoukankan      html  css  js  c++  java
  • [转]Python多进程并发操作中进程池Pool的应用

    Pool类

    在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。 
    Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
    下面介绍一下multiprocessing 模块下的Pool类下的几个方法

    apply()

    函数原型:

    apply(func[, args=()[, kwds={}]])

    该函数用于传递不定参数,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不在出现)。

    apply_async()

    函数原型:

    apply_async(func[, args=()[, kwds={}[, callback=None]]])

    与apply用法一样,但它是非阻塞且支持结果返回进行回调。

    map()

    函数原型:

    map(func, iterable[, chunksize=None])

    Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。 
    注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

    close()

    关闭进程池(pool),使其不在接受新的任务。

    terminate()

    结束工作进程,不在处理未处理的任务。

    join()

    主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。

    multiprocessing.Pool类的实例:

    复制代码
    import time
    from multiprocessing import Pool
    def run(fn):
      #fn: 函数参数是数据列表的一个元素
      time.sleep(1)
      return fn*fn
    
    if __name__ == "__main__":
      testFL = [1,2,3,4,5,6]  
      print 'shunxu:' #顺序执行(也就是串行执行,单进程)
      s = time.time()
      for fn in testFL:
        run(fn)
    
      e1 = time.time()
      print "顺序执行时间:", int(e1 - s)
    
      print 'concurrent:' #创建多个进程,并行执行
      pool = Pool(5)  #创建拥有5个进程数量的进程池
      #testFL:要处理的数据列表,run:处理testFL列表中数据的函数
      rl =pool.map(run, testFL) 
      pool.close()#关闭进程池,不再接受新的进程
      pool.join()#主进程阻塞等待子进程的退出
      e2 = time.time()
      print "并行执行时间:", int(e2-e1)
      print rl
    复制代码

    执行结果:

    shunxu:
    顺序执行时间: 6
    concurrent:
    并行执行时间: 2
    [1, 4, 9, 16, 25, 36]

    上例是一个创建多个进程并发处理与顺序执行处理同一数据,所用时间的差别。从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。 
    程序中的r1表示全部进程执行结束后全局的返回结果集,run函数有返回值,所以一个进程对应一个返回结果,这个结果存在一个列表中,也就是一个结果堆中,实际上是用了队列的原理,等待所有进程都执行完毕,就返回这个列表(列表的顺序不定)。 
    对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),让其不再接受新的Process了。

    再看一个实例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    import time
    from multiprocessing import Pool
    def run(fn) :
      time.sleep(2)
      print fn
    if __name__ == "__main__" :
      startTime = time.time()
      testFL = [1,2,3,4,5]
      pool = Pool(10)#可以同时跑10个进程
      pool.map(run,testFL)
      pool.close()
      pool.join()   
      endTime = time.time()
      print "time :", endTime - startTime

      

    执行结果:

    21
    
    3
    4
    5
    time : 2.51999998093

    再次执行结果如下:

    1
    34
    
    2
    5
    time : 2.48600006104

    结果中为什么还有空行和没有折行的数据呢?其实这跟进程调度有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受哪个请求以及执行完成时间都是不定的,所以会出现输出乱序的情况。那为什么又会有没这行和空行的情况呢?因为有可能在执行第一个进程时,刚要打印换行符时,切换到另一个进程,这样就极有可能两个数字打印到同一行,并且再次切换回第一个进程时会打印一个换行符,所以就会出现空行的情况。

     

     

     在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
          Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:

    1. #!/usr/bin/env python
    2. #coding=utf-8
    3. """
    4. Author: Squall
    5. Last modified: 2011-10-18 16:50
    6. Filename: pool.py
    7. Description: a simple sample for pool class
    8. """
    9.  
    10. from multiprocessing import Pool
    11. from time import sleep

    1. def f(x):
    2.     for i in range(10):
    3.         print '%s --- %s ' (i, x)
    4.         sleep(1)
    5. def main():
    6.     pool = Pool(processes=3)    # set the processes max number 3
    7.     for i in range(11,20):
    8.         result = pool.apply_async(f(i,))
    9.     pool.close()
    10.     pool.join()
    11.     if result.successful():
    12.         print 'successful'
    13. if __name__ ="__main__":
    14.     main()

           先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况,会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求,pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但必pool.join()必须使用在pool.close()或者pool.terminate()之后。其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。
        利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低。

    转载自:http://www.cnblogs.com/huanxiyun/articles/5826902.html

  • 相关阅读:
    为什么有时候程序出问题会打印出“烫烫烫烫...
    VC++共享数据段实现进程之间共享数据
    IEEE浮点数float、double的存储结构
    前端智勇大闯关
    Python:高级主题之(属性取值和赋值过程、属性描述符、装饰器)
    来认识下less css
    Koala Framework
    在使用Kettle的集群排序中 Carte的设定——(基于Windows)
    标准库类型
    iOS多线程的初步研究1
  • 原文地址:https://www.cnblogs.com/themost/p/6883370.html
Copyright © 2011-2022 走看看