一、昨日内容回顾:
1. 守护进程
1)、p.saemon,
2 )、p.terminate
3 )、p.join
2. 同步控制
1)、锁,Lock
互斥锁,解决数据安全、进程之间资源抢占问题。
2)、信号量,Semaphore
锁+计数器
3)、事件,Event
通过一个标志位flag来控制进程的阻塞和执行。
3. 多进程实现tcp协议的socket的sever端
1)子进程中不能使用input
2)允许端口的重用设置
3)妥善处理sk的close确保操作系统的资源能够被及时回收。
import socket from multiprocessing import Process def func(conn): conn.send(b'hello') data = conn.recv(1024) print(data.decode('utf-8')) conn.close() if __name__ == '__main__': sk = socket.socket() sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sk.bind(('127.0.0.1', 9000)) sk.listen(5) try: while True: con, addr = sk.accept() p = Process(target=func, args=(con,)) p.start() finally: sk.close()
import socket sk = socket.socket() sk.connect(('127.0.0.1', 9000)) data = sk.recv(1024) print(data) msg = input('>>>').encode('utf-8') sk.send(msg)
二、今日内容总结:
1、进程间的通信:
1)、队列 Queue:队列是加锁的,在多进程之间对数据的管理是安全的
维护了一个先进先出的顺序,且保证了数据在进程之间是安全的。
put,get,full,empty,get_nowait,put_nowait
生产者和消费者模型:
(1)、解决生产消费供需关系,生产的东西不够吃,就再开启一个进程生产。。。
(2)、解决消费者不能结束消费完物品的循环和阻塞问题,队列中引入None,
让消费者再取的时候判断是否遇到None,遇到则结束。有几个消费者就队列中就put几个None
(3)、解决生产者生产完成后主程序才结束问题,对生产者的进程进程join阻塞
JoinableQueue:
join和task_done方法:
join会阻塞队列,直至队列中的数据被取完,且执行了一个task_done,程序才会继续执行。
from multiprocessing import Process, Queue import time, random def consumer(name, q): while True: time.sleep(random.randint(1, 3)) food = q.get() if food is None: break print('%s吃了%s' % (name, food)) def producer(name, food, q): for i in range(10): time.sleep(random.randint(1, 5)) q.put('%s生产了%s%s' % (name, food, i)) print('%s生产了数据%s%s' % (name, food, i)) if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=('egon', '面包', q)) p2 = Process(target=producer, args=('taibai', '骨头', q)) p1.start() p2.start() c1 = Process(target=consumer, args=('alex', q)) c2 = Process(target=consumer, args=('firedragon', q)) c1.start() c2.start() p1.join() p2.join() q.put(None) q.put(None)
from multiprocessing import JoinableQueue,Process import time def consumer(name,q): while True: obj = q.get() time.sleep(0.3) print('%s吃了一个%s' % (name, obj)) q.task_done() if __name__ == '__main__': q = JoinableQueue() for i in range(10): q.put('food%s' % i) p1 = Process(target=consumer, args=('alex', q)) p1.daemon = True p1.start() q.join() # 阻塞队列,直至队列的数据被取完,且执行了一个task_done() # p1为守护进程,主程序代码执行完毕后,守护进程随之结束,里边的循环自然也结束了。
2)、管道 Pipe:底层实现是pickle,对数据的管理是不安全的,队列的实现机制就是管道+锁
双向通信:利用pickle实现的
收不到,就阻塞
# 管道的EOFError是怎么报出来的(同时关闭主进程的lp和子进程的lp就会报出EOFError)
# 管道在数据管理上是不安全的
# 队列的实现机制 就是 管道+锁
from multiprocessing import Pipe,Process # lp,rp = Pipe() # lp.send('hello') # print(rp.recv()) # # print(rp.recv()) # 没有数据在此阻塞进程 # # rp.send() # 不能发送空数据 # lp.send([1,2,3]) # print(rp.recv()) def consumer(lp,rp): lp.close() #1.这个发关闭 while True: print(rp.recv()) if __name__ == '__main__': lp, rp = Pipe() Process(target=consumer, args=(lp, rp)).start() Process(target=consumer, args=(lp, rp)).start() Process(target=consumer, args=(lp, rp)).start() Process(target=consumer, args=(lp, rp)).start() #rp.close() for i in range(100): lp.send('food%i' % i) lp.close() #2.这个关闭 这两关闭才会报错EOFError
2、进程之间的数据共享,Manager
Manager创建的数据(如字典等)可以在进程之间共享,涉及数据操作要加上锁,不然会出现数据错乱。
m = Manager() dic = m.dict({‘count’:100})
with Manager() as m: dic = m.dict({‘count’:100}) 但涉及dic的操作代码必须在with的缩进执行
from multiprocessing import Lock,Manager,Process def func(dic_tmp, lock_tmp): with lock_tmp: dic_tmp['count'] -= 1 if __name__ == '__main__': lock = Lock() with Manager() as m: dic = m.dict({'count': 50}) p_lst = [] for i in range(50): p = Process(target=func, args=(dic, lock)) p.start() p_lst.append(p) for i in p_lst: i.join() print(dic)
3、进程池
进程池使用场景PK多进程:
1.对于纯计算的代码,使用进程池更好(个人理解,高效利用cpu没有了节省了进程的开启和回收时间,也节省操作系统调度进程切换的时间)
2.对于高IO的代码,没有更好选择的情况下使用多进程。
总结:使用进程池比起多进程,节省了开启进程回收进程资源的时间,给操作系统调度进程降低了难度。
进程池apply(同步)添加入池方法和apply_async(异步)
使用进程池提交任务方法:
p = Pool(5)
p.appy(func=***,args=(,))) #同步提交任务 没有多进程的优势
p.apply_async(func=***,args=(,)) #异步提交任务
p.close() #关闭进程池,阻止向进程池添加新的任务
p.join() #依赖close,进程池必须先close后join(个人理解应该是要阻塞执行完进程池的任务,才进入非阻塞状态)
------代码-------
from multiprocessing import Pool,Process import time,random def wahaha(num): time.sleep(random.randint(1,3)) print('num:%s' % num**num) if __name__ == '__main__': # -------------------------适合高计算-------------------------------------------- p = Pool(5) # start = time.time() # for i in range(100): # p.apply_async(func=wahaha,args=(i,)) # # p.close() # p.join() # print(time.time()-start) start = time.time() p.map(func=wahaha,iterable=range(101)) print(time.time()-start) # -------------------------适合高IO-------------------------------------------- # start = time.time() # p_lst = [] # for i in range(101): # p = Process(target=wahaha,args=(i,)) # p.start() # p_lst.append(p) # for i in p_lst: # i.join() # print(time.time() - start)
使用map添加任务的方法以及它和普通(apply_async)方法的区别:
p.map(func=***,iterable=range(101))
优点:就是一个任务函数,个一个itetable,节省了for循环和close,join,是一种简便写法。
区别:apply_async和map相比,操作复杂,但是可以通过get方法获取返回值,而map不行。
def wahaha(num): print(num) return num*'*' if __name__ == '__main__': p = Pool(5) start = time.time() result_lst = [] for i in range(100): res = p.apply_async(func=wahaha,args=(i,)) result_lst.append(res) print(result_lst) for j in result_lst:print(j.get()) p.close() p.join() print(time.time()-start)
回调函数:可以接收func函数的返回值。但callback函数在主进程中运行。
p.apply_async(func=***,args=(*,),callback=回调函数))
from multiprocessing import Pool,Process import os def wahaha(num): print('子进程:',os.getpid()) return num**num def callb(argv): print(os.getpid()) print(argv) if __name__ == '__main__': print('主进程', os.getpid()) p = Pool(5) p.apply_async(func=wahaha,args=(1,),callback=callb) p.close() p.join()