1、进程池
当有成千上万个任务需要被执行的时候,有了进程池我们就不必去创建大量的进程. 首先,创建进程需要消耗时间,销毁进程(空间,变量,文件信息等等的内容)也需要消耗时间, 第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,维护一个很大的进程列表的同时,调度的时候,还需要进行频繁切换并且记录每个进程的执行节点, 这样反而会影响程序的效率。
创建一个有固定数量的进程池, 执行任务的时候就拿池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务, 可以减少创建进程的开支. 这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果.
使用进程池来实现并发效果, 减少创建进程的开支, 提高效率.
map()方法: 异步调用进程, map自带join()的功能.
1 import time 2 from multiprocessing import Pool,Process 3 4 5 def func1(i): 6 numb = 0 7 for j in range(5): 8 numb += i 9 10 11 if __name__ == '__main__': 12 13 p_lst = [] 14 s_time = time.time() 15 for i in range(500): 16 p = Process(target=func1,args=(i,)) 17 p.start() 18 p_lst.append(p) 19 [pp.join() for pp in p_lst] 20 e_time = time.time() 21 dis_time = e_time - s_time 22 print("非进程池",dis_time) # 9.458896160125732 处理时间 23 24 # --------------------------------------------------------------------------------- 25 26 pool = Pool(4) 27 ps_time = time.time() 28 pool.map(func1, range(100)) # 把可迭代对象的每一个元素都作为参数扔给func1 29 pe_time = time.time() 30 dis_time = pe_time - ps_time 31 print("进程池",dis_time) # 0.07204794883728027 处理时间
apply()方法: 提供一个同步串行的方法.
1 import time 2 from multiprocessing import Pool,Process 3 4 5 def func1(i): 6 numb = 0 7 for j in range(5): 8 numb += i 9 time.sleep(0.5) 10 return numb 11 12 13 14 15 if __name__ == '__main__': 16 17 pool = Pool(4) 18 19 for i in range(10): 20 print(i) 21 ret = pool.apply(func1, args=(i,)) # apply提供的是一个同步串行的执行方法. 22 # 进程1 执行完任务后, ret 获取到数据, 第二个进程才开始执行任务. 23 24 print(ret)
apply_async()方法: 进程池异步调用方法.
注意: 在使用进程池异步调用时, 主进程结束时, 所有的子进程也跟着一起结束了(后台全部关闭), 所以主程序必须得先 join() , 等待子进程结束.
使用apply_async()异步调用时, 主程序必须使用 join() 方法, 等待进程池内的任务都处理完, 才能用get()获取结果.
使用map()异步调用时, 不用写 join() 方法, map()会自动 join().
1 import time 2 from multiprocessing import Pool,Process 3 4 5 def func1(i): 6 numb = 0 7 for j in range(5): 8 numb += i 9 time.sleep(1) 10 return numb 11 12 13 14 if __name__ == '__main__': 15 16 pool = Pool(4) 17 ret_lst = [] 18 for i in range(10): # 相当于发布10个任务, 4个进程都过来拿任务 19 print(i) 20 res = pool.apply_async(func1,args=(i,)) # 各进程都是异步状态 21 ret_lst.append(res) # 这一步, 是把所有的res的执行对象都先放进列表里(包括那些没有结果的对象, 22 # 即使后面6个任务都没有执行,但是都是先把执行对象放进列表里) 23 24 pool.close() # 不是关闭进程池, 而是结束进程池接受任务, 确保没有任务再传过来 25 pool.join() # 感知进程池中的任务已经结束, 只有进程池结束接收任务, 才能感知进程池中的任务结束, 所以必须加 close(). 26 27 for res in ret_lst: 28 print(res.get()) # 前4个有结果, 用get()方法获取到结果后, 一直阻塞在后六个处,直到结果传进来执行对象中
2、回调函数
回调函数在主进程中被执行的, 子进程执行完相应的代码后, 返回主进程去执行回调函数, 它帮我们省略了主进程自身调用函数的这一步骤.
1 from multiprocessing import Process,Pool 2 3 4 def func1(n): 5 return n * n 6 7 def call_back_func(ret): # 这里的ret 传的是func1的结果. 8 with open("回调内容","w") as f: 9 f.write(str(ret)) 10 11 12 13 14 if __name__ == '__main__': 15 16 pool = Pool(4) 17 ret = pool.apply_async(func1,args=(25,),callback=call_back_func) 18 # callback后面跟回调函数, 即把func1的结果传进callback回调函数中去执行,因为调用者拿不到 19 # 回调函数的返回值, 所以只能将返回值写进文件或者数据库里. 20 21 print(ret.get())
3、进程间的通信
多进程共同去处理共享数据的时候,就和我们多进程同时去操作一个文件中的数据是一样的,不加锁就会出现错误的结果,进程不安全的,所以也需要加锁
数据共享----Manager模块
给Manager对象里面传入你要共享的数据, 然后操作数据时一样要上锁、解锁.
1 from multiprocessing import Process,Lock,Manager 2 3 def func1(dic,loc): 4 with loc: # with loc 做了两件事: loc.acquire() 和 loc.release(), 自动上锁和解锁 5 dic["numb"] -= 1 6 7 8 9 if __name__ == '__main__': 10 m = Manager() 11 loc = Lock() 12 dic = m.dict({"numb": 100}) 13 p_lst = [] 14 for i in range(100): 15 p = Process(target=func1, args=(dic,loc)) 16 p.start() 17 p_lst.append(p) 18 [pp.join() for pp in p_lst] 19 print(">>>>>>",dic["numb"])