复习
# 信号量 from multiprocessing import Semaphore # 用锁的原理实现的,内置了一个计数器 # 在同一个事件,只能有指定数量的进程执行某一段被控制住的代码 # 事件 # wait阻塞受到事件控制的同步组件 # 状态 True Flase is_set # true--》false 用clear() # false --->true 用set() # wait方法 状态为true不阻塞 状态为false的时候阻塞 # 队列 # Queue # put 当队列满的时候阻塞等待队列有空位置 # get 当队列空的时候阻塞等待队列有数据 # full empty 不完全准确 # JoinableQuere # task_done 与get连用 # join 与put连用
管道
from multiprocessing import Pipe,Process def func(conn1,conn2): conn2.close() while True: try: msg = conn1.recv() print(msg) except EOFError: conn1.close() break if __name__ == '__main__': conn1,conn2 = Pipe() Process(target=func,args=(conn1,conn2)).start() conn1.close() for i in range(20): conn2.send('吃了吗') conn2.close()
from multiprocessing import Pipe,Process import time,random def producer(con,pro,name,food): con.close() for i in range(4): time.sleep(random.randint(1,3)) f = '%s生产%s%s'%(name,food,i) print(f) pro.send(f) pro.close() def consumer(con,pro,name): pro.close() while True: try: food = con.recv() print('%s吃了%s'%(name,food)) time.sleep(random.randint(1,3)) except EOFError: con.close() break if __name__ == '__main__': con,pro = Pipe() p = Process(target=producer,args = (con,pro,'egon','泔水')) p.start() c = Process(target=consumer,args = (con,pro,'alex')) c.start() con.close() pro.close()
进程之间的数据共享
from multiprocessing import Manager,Process,Lock def main(dic,lock): lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() l = Lock() dic = m.dict({'count':100}) p_list = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_list.append(p) for i in p_list: i.join() print('主进程:',dic)
进程池
# 为什么有进程池
# 效率
# 每开启进程,开启属于这个进程的内存空间
# 寄存器 堆栈 文件
# 进程过多,操作系统调度进程
# 进程池
# python中的先创建一个属于进程的池子
# 这个池子指定能存放多少个进程
# 先将这些进程创建好
from multiprocessing import Pool import os,time def func(n): print('start func%s'%n,os.getpid()) time.sleep(1) print('end func%s'%n,os.getpid()) if __name__ == '__main__': p = Pool(5) for i in range(10): p.apply_async(func,args=(i,)) p.close() #结束进程池接受任务 p.join() #感知进程池中的任务执行结束
socket_server-进程池
#server import socket from multiprocessing import Pool def func(conn): conn.send(b'hello') print(conn.recv(1024).decode('utf-8')) conn.close() if __name__ == '__main__': p = Pool(5) sk = socket.socket() sk.bind(('127.0.0.1',8080)) sk.listen() while True: conn,addr = sk.accept() p.apply_async(func,args=(conn,)) sk.close() #client import socket sk = socket.socket() sk.connect(('127.0.0.1',8080)) ret = sk.recv(1024).decode('utf-8') print(ret) msg = input('>>>').encode('utf-8') sk.send(msg) sk.close()
进程池返回值
# p.map(funcname,iterable) 默认异步的执行任务,自带close和join
# p.apply 同步调用
# p.apply_async 异步调用 和主进程完全异步 需要手动close和join
from multiprocessing import Pool def func(i): return i*i if __name__ == '__main__': p = Pool(5) for i in range(10): res = p.apply(func,args=(i,)) #apply的结果就是func的返回值 print(res)
import time from multiprocessing import Pool def func(i): time.sleep(0.5) return i*i if __name__ == '__main__': p = Pool(5) res_list = [] for i in range(10): res = p.apply_async(func,args=(i,)) # res_list.append(res) for res in res_list:print(res.get())
#map import time from multiprocessing import Pool def func(i): time.sleep(0.5) return i*i if __name__ == '__main__': p = Pool(5) ret = p.map(func,range(10)) print(ret) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
进程池的回调函数
from multiprocessing import Pool def func1(n): print('in func1') return n*n def func2(nn): print('in func2') print(nn) if __name__ == '__main__': p = Pool(5) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join()
in func1
in func2
100
from multiprocessing import Pool import os def func1(n): print('in func1',os.getpid()) return n*n def func2(nn): #参数只能是func1的返回值 print('in func2',os.getpid()) print(nn) if __name__ == '__main__': print('主进程: ',os.getpid()) p = Pool(5) p.apply_async(func1,args=(10,),callback=func2) p.close() p.join() 主进程: 11172 in func1 11760 in func2 11172 100