#进程池Pool:apply apply_async-close-join-get map callback
#1、进程池Pool:执行下面代码发现任务012先执行,345后执行,因为进程池只有3个进程,6480进程先处理1,接着处理3和5。
# 可以设置线程的数量去轮流执行它的任务:例如下面例子设置3个进程去轮流执行6个任务。
#map 创建和开启子进程:
from multiprocessing import Pool
import time
import os
import random
def func(i): #i接收0-5
print(i,os.getpid())
time.sleep(random.randint(1,3))
if __name__ == '__main__':
p = Pool(3) #开启3个进程来处理6个任务。3个3个的并发:一个进程在同一时间只能执行一个任务。
p.map(func,range(6)) #6个任务。把0-5导入到func去执行。
# 0 16156
# 1 6480
# 2 15412
# 3 6480
# 4 15412
# 5 6480
#2、
#2.1进程池出现的原因:1、开启过多进程会浪费时间,譬如下面的例子,我们只需要开启三个进程来处理六个任务,而不需要开启六个进程。
# 2、操作系统调度过多进程会影响效率。
#2.2开启进程池:池子里有几个进程,有任务来了,就用这个池子里的进程去处理任务,任务处理完之后,
# 再把进程放回池子里,池子里的进程就可以去处理别的任务了。当所有的任务处理完之后,
# 进程池关闭,回收所有的进程。
#2.3开启进程的数量最好是cpu的数量加1。
#3、apply相当于target,是同步调用,代码是按顺序执行的,没有并发的效果,一般不用,
# 因为是同步的话,一个进程就可以了,没必要利用主进程开启子进程的方式来实现同步。
from multiprocessing import Pool
import time
def func(i):
time.sleep(1)
i += 1
print(i)
if __name__ == '__main__':
p = Pool(3)
for i in range(6):
p.apply(func,args=(i,))
# 1
# 2
# 3
# 4
# 5
# 6
#4、apply_async是异步调用,创建和开启子进程:
# p.close()和p.join()上面的主进程代码执行之后,主进程就结束了。但是进程池的进程还没结束,
# 会让内存产生一大堆没有被回收的进程。所以必须写上这两句代码。使用get拿到结果。
from multiprocessing import Pool
import time
def func(i):
time.sleep(1)
i += 1
return i
if __name__ == '__main__':
p = Pool(3)
ret_l = []
for i in range(6):
ret = p.apply_async(func,args=(i,)) #子进程返回结果i给func,结果i = ret.get()
ret_l.append(ret)
p.close() #进程池关闭,不能再往进程池添加新的任务。
p.join() #阻塞等待,主进程等待子进程结束之后才结束。
[print(ret.get()) for ret in ret_l] #异步调用;把子进程返回的结果都放在列表里面,最后才一次性拿取,实现并发。
# #下面代码没有并发效果:
# if __name__ == '__main__':
# p = Pool(3)
# for i in range(6):
# ret = p.apply_async(func,args=(i,))
# print(ret.get()) #子进程每返回一个结果之后才能拿到一个结果,相当于同步,没有并发的效果。
# p.close()
# p.join()
#5、map可接收返回值:
from multiprocessing import Pool
def func(i):
i += 1
return i
if __name__ == '__main__':
p = Pool(3)
ret = p.map(func,range(6))#map的最后一个参数是可迭代的,例如range(6)
print(ret)
# [1, 2, 3, 4, 5, 6]
# 6、回调函数:把func的返回值传给call做进一步处理。从pid可以看出,call函数是在主进程当中执行:
from multiprocessing import Pool
import os
def func(i):
print('子进程func%s %s'%(i,os.getpid()))
i += 1
return i
def call(arg):
print('call %s' % os.getpid())
arg += 1
print(arg)
if __name__ == '__main__':
print('主进程%s' % os.getpid())
p = Pool(3)
for i in range(6):
p.apply_async(func,args=(i,),callback=call)
p.close()
p.join()
# 2
# 3
# 4
# 5
# 6
# 7