zoukankan      html  css  js  c++  java
  • 41.进程池--Pool

    进程池


    • 方便创建,管理进程,单独进程的Process创建,需要手动开启,维护任务函数,以及释放回收
    • 进程池不需要这么麻烦,进程提前创建好,未来在使用的时候,可以直接给与任务函数
    • 某个进程池中的任务结束了,占用的进程会自己释放刚才工作的事情,以便接收下一个
    • P = Pool(num) #创建一个包含有num个空闲进程的池子
    • p.apply() 填充任务,任务如果结束,会自动释放掉当前占用的进程
    • 创建大规模任务,Pool(100)
    • 1,创建进程池:进程池中的进程是可以复用的
      • from mutliprocessing import Pool
      • p = Pool(num)
        • num:指明当前多少空闲进程创建出来
      • p.apply(func,args,)
        • 阻塞行为
        • func:指明填充功能函数名
        • args:对应的参数
        • 阻塞行为相当于(lock)加锁的进程池工作方式,有序的,第一个执行完才会执行第二个
      • p.apply_async(func,args,)
        • 非阻塞行为,并发的,无序的
      • p.close()
        • 在整个业务结束之后,进程池要首先关闭
        • 关闭之后进程池里的旧任务会继续执行但是没有办法填充新的任务
        • 进程池关闭了就无法打开
      • p.join()
        • 进程回收,把关闭了的进程池中的每个进程join() 释放回收掉
      • p.terminate()
        • 直接关闭进程池,并且终止所欲偶进程
    • 2,进程池的工作的返回值:
      • res = p.apply(func,)
        • res就是进程池的工作结果
        • 立竿见影就可以看到结果,就因为apply填充任务是阻塞行为
      • res = p.apply_aysnc(func,)
        • 非阻塞的结果,可以立即拿到,但是不是结果,只是一个抽象虚拟的值
          • 这个值代表进程结束后的返回值
        • res.get() #使用要谨慎
          • 当前非阻塞执行的进程,有优先级先结束
          • 强制要求立即这个结果,但是会影响进程之间的并发效果
    • 3,进程池中的通信队列是特殊的
      • from multiprocessing import Manager
      • q = Manager().Queue()   #进程共享队列
      • 无法使用管道(Pipe)
    • #进程池创建
      from multiprocessing import Pool
      import sys
      def work_a():
      	for var in range(1,5):
      		print(var) 
      		sys.stdout.flush()
      def work_b():
      	for var in range(5,10):
      		print(var)
      		sys.stdout.flush()
      def work_c():
      	for var in range(10,15):
      		print(var)
      		sys.stdout.flush()
      def main():
      	p = Pool(2) #参数:可以最多同时执行任务个数,并不是填充的最大任务个数
      	#p.apply_async(func=work,args=(a,b),) 非阻塞行为
      	p.apply(func=work_a)#阻塞行为
      	p.apply(func=work_b)
      	p.apply(func=work_c)
      	#?: 是否是阻塞行为执行完这三个任务
      		#阻塞的话:1个等一个,同步
      		#非阻塞:异步
      	p.close()
      	p.join()
      if __name__ == '__main__':
      	main()
      

      运行结果:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      
    • #获取阻塞进程池返回结果
      from multiprocessing import Pool
      import sys
      def work_a():
      	for var in range(1,5):
      		print(var) 
      		sys.stdout.flush()
      	return 'a'
      def work_b():
      	for var in range(5,10):
      		print(var)
      		sys.stdout.flush()
      	return 'b'
      def work_c():
      	for var in range(10,15):
      		print(var)
      		sys.stdout.flush()
      	return 'c'
      def main():
      	p = Pool(2) #参数:可以最多同时执行任务个数,并不是填充的最大任务个数
      	res1 = p.apply(func=work_a) #阻塞的一个等一个,res1执行完才会执行res2
      	res2 = p.apply(func=work_b)
      	res3 = p.apply(func=work_c)
      	print('res1进程返回结果:%s' % res1),print('res2进程返回结果:%s' % res2),print('res3进程返回结果:%s' % res3)
      	p.close(),p.join()
      if __name__ == '__main__':
      	main()
      

      运行结果:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      res1进程返回结果:a
      res2进程返回结果:b
      res3进程返回结果:c
      
    • #获取非阻塞进程池返回结果
      from multiprocessing import Pool
      import sys
      def work_a():
      	for var in range(1,5):
      #		print(var) 
      		sys.stdout.flush()
      	return 'a'
      def work_b():
      	for var in range(5,10):
      #		print(var)
      		sys.stdout.flush()
      	return 'b'
      def work_c():
      	for var in range(10,15):
      #		print(var)
      		sys.stdout.flush()
      	return 'c'
      def main():
      	p = Pool(2) #参数:可以最多同时执行任务个数,并不是填充的最大任务个数
      	res1 = p.apply_async(func=work_a) #非阻塞的会返回一个抽象的数据
      	res2 = p.apply_async(func=work_b)
      	res3 = p.apply_async(func=work_c)
      	print('res1进程返回结果:%s' % res1.get()),print('res2进程返回结果:%s' % res2.get()),print('res3进程返回结果:%s' % res3.get())
      	p.close(),p.join()
      if __name__ == '__main__':
      	main()
      

      运行结果:

      res1进程返回结果:a
      res2进程返回结果:b
      res3进程返回结果:c
      
    • #进程池通讯--Queue
      from multiprocessing import Pool,Manager,Queue
      from time import sleep
      import sys
      def work_a(q):
      	#生产者 放十次
      	for var in range(10):
      		print('生产者:',var) 
      		sys.stdout.flush()
      		q.put(var)
      		sleep(1)
      def work_b(q):
      	#消费者,拿十次
      	for var in range(10):
      		res = q.get() #阻塞行为
      		print('消费者:',var)
      		sys.stdout.flush()
      def main():
      	q = Manager().Queue() #进程共享队列
      	p = Pool(5) #进程可以复用
      	p.apply_async(func=work_a,args={q,q})
      	p.apply_async(func=work_b,args={q,q})
      	p.close()
      	p.join()
      if __name__ == '__main__':
      	main()
      

      运行结果:

      生产者: 0
      消费者: 0
      生产者: 1
      消费者: 1
      生产者: 2
      消费者: 2
      生产者: 3
      消费者: 3
      生产者: 4
      消费者: 4
      生产者: 5
      消费者: 5
      生产者: 6
      消费者: 6
      生产者: 7
      消费者: 7
      生产者: 8
      消费者: 8
      生产者: 9
      消费者: 9  
    •   

        

          

  • 相关阅读:
    System Idle Process SYSTEM占用CPU
    apache和nginx的rewrite的区别
    解决file_get_contents failed to open stream: HTTP request failed! 错误
    个人总结大型高并发高负载网站的系统架构(转)
    代码的抽象三原则
    mysqldump导入某库某表的数据
    mysql中insert into和replace into以及insert ignore用法区别
    【原创】学习日记4:nginx负载均衡(二)2012.01.08
    【原创】学习日记1:redis安装以及lnmp环境搭建2012.01.06
    mysql优化 mysql.ini优化
  • 原文地址:https://www.cnblogs.com/zhangan/p/10277730.html
Copyright © 2011-2022 走看看