进程池
- 方便创建,管理进程,单独进程的Process创建,需要手动开启,维护任务函数,以及释放回收
-
进程池不需要这么麻烦,进程提前创建好,未来在使用的时候,可以直接给与任务函数
-
某个进程池中的任务结束了,占用的进程会自己释放刚才工作的事情,以便接收下一个
-
P = Pool(num) #创建一个包含有num个空闲进程的池子
-
p.apply() 填充任务,任务如果结束,会自动释放掉当前占用的进程
- 创建大规模任务,Pool(100)
- 1,创建进程池:进程池中的进程是可以复用的
-
-
from mutliprocessing import Pool
-
p = Pool(num)
-
num:指明当前多少空闲进程创建出来
-
-
p.apply(func,args,)
-
阻塞行为
- func:指明填充功能函数名
- args:对应的参数
- 阻塞行为相当于(lock)加锁的进程池工作方式,有序的,第一个执行完才会执行第二个
-
-
p.apply_async(func,args,)
- 非阻塞行为,并发的,无序的
- p.close()
- 在整个业务结束之后,进程池要首先关闭
- 关闭之后进程池里的旧任务会继续执行但是没有办法填充新的任务
- 进程池关闭了就无法打开
- p.join()
- 进程回收,把关闭了的进程池中的每个进程join() 释放回收掉
- p.terminate()
- 直接关闭进程池,并且终止所欲偶进程
-
- 2,进程池的工作的返回值:
- res = p.apply(func,)
-
res就是进程池的工作结果
- 立竿见影就可以看到结果,就因为apply填充任务是阻塞行为
-
- res = p.apply_aysnc(func,)
-
非阻塞的结果,可以立即拿到,但是不是结果,只是一个抽象虚拟的值
- 这个值代表进程结束后的返回值
- res.get() #使用要谨慎
- 当前非阻塞执行的进程,有优先级先结束
- 强制要求立即这个结果,但是会影响进程之间的并发效果
-
- res = p.apply(func,)
-
3,进程池中的通信队列是特殊的
-
from multiprocessing import Manager
-
q = Manager().Queue() #进程共享队列
- 无法使用管道(Pipe)
-
-
#进程池创建 from multiprocessing import Pool import sys def work_a(): for var in range(1,5): print(var) sys.stdout.flush() def work_b(): for var in range(5,10): print(var) sys.stdout.flush() def work_c(): for var in range(10,15): print(var) sys.stdout.flush() def main(): p = Pool(2) #参数:可以最多同时执行任务个数,并不是填充的最大任务个数 #p.apply_async(func=work,args=(a,b),) 非阻塞行为 p.apply(func=work_a)#阻塞行为 p.apply(func=work_b) p.apply(func=work_c) #?: 是否是阻塞行为执行完这三个任务 #阻塞的话:1个等一个,同步 #非阻塞:异步 p.close() p.join() if __name__ == '__main__': main()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
-
#获取阻塞进程池返回结果 from multiprocessing import Pool import sys def work_a(): for var in range(1,5): print(var) sys.stdout.flush() return 'a' def work_b(): for var in range(5,10): print(var) sys.stdout.flush() return 'b' def work_c(): for var in range(10,15): print(var) sys.stdout.flush() return 'c' def main(): p = Pool(2) #参数:可以最多同时执行任务个数,并不是填充的最大任务个数 res1 = p.apply(func=work_a) #阻塞的一个等一个,res1执行完才会执行res2 res2 = p.apply(func=work_b) res3 = p.apply(func=work_c) print('res1进程返回结果:%s' % res1),print('res2进程返回结果:%s' % res2),print('res3进程返回结果:%s' % res3) p.close(),p.join() if __name__ == '__main__': main()
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 res1进程返回结果:a res2进程返回结果:b res3进程返回结果:c
-
#获取非阻塞进程池返回结果 from multiprocessing import Pool import sys def work_a(): for var in range(1,5): # print(var) sys.stdout.flush() return 'a' def work_b(): for var in range(5,10): # print(var) sys.stdout.flush() return 'b' def work_c(): for var in range(10,15): # print(var) sys.stdout.flush() return 'c' def main(): p = Pool(2) #参数:可以最多同时执行任务个数,并不是填充的最大任务个数 res1 = p.apply_async(func=work_a) #非阻塞的会返回一个抽象的数据 res2 = p.apply_async(func=work_b) res3 = p.apply_async(func=work_c) print('res1进程返回结果:%s' % res1.get()),print('res2进程返回结果:%s' % res2.get()),print('res3进程返回结果:%s' % res3.get()) p.close(),p.join() if __name__ == '__main__': main()
运行结果:
res1进程返回结果:a res2进程返回结果:b res3进程返回结果:c
-
#进程池通讯--Queue from multiprocessing import Pool,Manager,Queue from time import sleep import sys def work_a(q): #生产者 放十次 for var in range(10): print('生产者:',var) sys.stdout.flush() q.put(var) sleep(1) def work_b(q): #消费者,拿十次 for var in range(10): res = q.get() #阻塞行为 print('消费者:',var) sys.stdout.flush() def main(): q = Manager().Queue() #进程共享队列 p = Pool(5) #进程可以复用 p.apply_async(func=work_a,args={q,q}) p.apply_async(func=work_b,args={q,q}) p.close() p.join() if __name__ == '__main__': main()
运行结果:
生产者: 0 消费者: 0 生产者: 1 消费者: 1 生产者: 2 消费者: 2 生产者: 3 消费者: 3 生产者: 4 消费者: 4 生产者: 5 消费者: 5 生产者: 6 消费者: 6 生产者: 7 消费者: 7 生产者: 8 消费者: 8 生产者: 9 消费者: 9
-