from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() while True: try : msg = conn1.recv() print(msg) except EOFError: conn1.close() break if __name__ == '__main__': conn1, conn2 = Pipe() Process(target=func,args = (conn1,conn2)).start() conn1.close() for i in range(20): conn2.send('吃了么') conn2.close()
# from multiprocessing import Lock,Pipe,Process # def producer(con,pro,name,food): # con.close() # for i in range(100): # f = '%s生产%s%s'%(name,food,i) # print(f) # pro.send(f) # pro.send(None) # pro.send(None) # pro.send(None) # pro.close() # # def consumer(con,pro,name,lock): # pro.close() # while True: # lock.acquire() # food = con.recv() # lock.release() # if food is None: # con.close() # break # print('%s吃了%s' % (name, food)) # if __name__ == '__main__': # con,pro = Pipe() # lock= Lock() # p = Process(target=producer,args=(con,pro,'egon','泔水')) # c1 = Process(target=consumer, args=(con, pro, 'alex',lock)) # c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock)) # c3 = Process(target=consumer, args=(con, pro, 'wusir',lock)) # c1.start() # c2.start() # c3.start() # p.start() # con.close() # pro.close() # from multiprocessing import Process,Pipe,Lock # # def consumer(produce, consume,name,lock): # produce.close() # while True: # lock.acquire() # baozi=consume.recv() # lock.release() # if baozi: # print('%s 收到包子:%s' %(name,baozi)) # else: # consume.close() # break # # def producer(produce, consume,n): # consume.close() # for i in range(n): # produce.send(i) # produce.send(None) # produce.send(None) # produce.close() # # if __name__ == '__main__': # produce,consume=Pipe() # lock = Lock() # c1=Process(target=consumer,args=(produce,consume,'c1',lock)) # c2=Process(target=consumer,args=(produce,consume,'c2',lock)) # p1=Process(target=producer,args=(produce,consume,30)) # c1.start() # c2.start() # p1.start() # produce.close() # consume.close() # pipe 数据不安全性 # IPC # 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象 # 队列 进程之间数据安全的 # 管道 + 锁
管道实现数据共享 # from multiprocessing import Manager,Process # def main(dic): # dic['count'] -= 1 # print(dic) # # if __name__ == '__main__': # m = Manager() # dic=m.dict({'count':100}) # p_lst = [] # p = Process(target=main, args=(dic,)) # p.start() # p.join() from multiprocessing import Manager,Process,Lock def main(dic,lock): dic['count'] -= 1 if __name__ == '__main__': m = Manager() l = Lock() dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_lst.append(p) for i in p_lst: i.join() print('主进程',dic)
非常重要的进程池概念:用少的继承解决更多的事情
##为什么有进程池的概念? ##效率问题 ##每次开启进程要占用属于这个进程的空间 ##寄存器 堆栈 文件 ##进程过多时 操作系统要调度 ## ## ##什么是进程池? ##将进程放入一个专属的内存空间就像是放入了池子中,然后开辟n下限,m上限.相当于出水口 ##这样多进程就不会占用系统太多的内存空间,操作系统也好调度进程了. ## ##当然高级进程池可以自动的调换出水口多少 ## ## ##python中用到multiprocessing模块的Pool方法
import time from multiprocessing import Process,Pool def func(n): for i in range(10): print(n+1) def func2(n): for i in range(10): print(n+2) if __name__=="__main__": start = time.time() pool = Pool(5) pool.map(func,range(100)) pool.map(func2,range(100)) t1 = time.time()-start start = time.time() p_lst = [] for i in range(100): p = Process(target=func,args=(i,)) p_lst.append(p) p.start() for p in p_lst :p.join() t2 = time.time() - start print(t1,t2)
import os import time from multiprocessing import Pool#默认起cpu个数 def func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s' % n,os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply(func,args=(i,))#同步提交任务 ## p.apply_async(func,args=(i,))#异步提交任务 p.close() # 结束进程池接收任务 p.join() # 感知进程池中的任务执行结束
# p = Pool() # p.map(funcname,iterable) 默认异步的执行任务,且自带close和join # p.apply 同步调用的 # p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join # from multiprocessing import Pool # def func(i): # return i*i # # if __name__ == '__main__': # p = Pool(5) # for i in range(10): # res = p.apply(func,args=(i,)) # apply的结果就是func的返回值 # print(res) # import time # from multiprocessing import Pool # def func(i): # time.sleep(0.5) # return i*i # # if __name__ == '__main__': # p = Pool(5) # res_l = [] # for i in range(10): # res = p.apply_async(func,args=(i,)) # apply的结果就是func的返回值 # res_l.append(res) # for res in res_l:print(res.get())# 等着 func的计算结果 # import time # from multiprocessing import Pool # def func(i): # time.sleep(0.5) # return i*i # # if __name__ == '__main__': # p = Pool(5) # ret = p.map(func,range(100)) # print(ret)
# 回调函数 import os from multiprocessing import Pool def func1(n): print('in func1',os.getpid()) return n*n def func2(nn): print('in func2',os.getpid()) print(nn) if __name__ == '__main__': print('主进程 :',os.getpid()) p = Pool(5) for i in range(10): p.apply_async(func1,args=(10,),callback=func2) p.close() p.join()