主要内容:
- 1.进程同步
- 2.进程间通信
1.进程同步
(1)锁
引出:虽然我们实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

import time import json import random from multiprocessing import Process def get_ticket(i): print("我已到达,要开始抢票了") with open("ticket","r") as f: last_ticket_info = json.load(f) last_ticket = last_ticket_info["count"] if last_ticket > 0: time.sleep(random.random()) last_ticket = last_ticket-1 last_ticket_info["count"] = last_ticket with open("ticket","w") as f: json.dump(last_ticket_info,f) print("%s号抢到了,Ynb" % i) else: print("%s傻缺没有抢到票,明年再来吧" %i) if __name__ =="__main__": for i in range(10): p = Process(target=get_ticket,args=(i,)) p.start()

import time import json import random from multiprocessing import Process,Lock def get_ticket(i,ticket_lock): print("我已到达,要开始抢票了") ticket_lock.acquire() #加锁,保证每次只有一个进程在执行里边的程序,这一段程序对于所有写上这个锁的进程,大家都变程了串行 with open("ticket","r") as f: last_ticket_info = json.load(f) last_ticket = last_ticket_info["count"] if last_ticket > 0: time.sleep(random.random()) last_ticket = last_ticket-1 last_ticket_info["count"] = last_ticket with open("ticket","w") as f: json.dump(last_ticket_info,f) print("%s号抢到了,Ynb" % i) else: print("%s傻缺没有抢到票,明年再来吧" %i) ticket_lock.release() #解锁,解锁后才能去执行自己的额其他程序 if __name__ =="__main__": ticket_lock = Lock() #在创建进程之前先创建一个锁 for i in range(10): p = Process(target=get_ticket,args=(i,ticket_lock)) #模拟10个人来进行抢票,此时并将锁作为参数传给要执行的函数 p.start()
可以看出:加锁后只有一个人能抢到最后一张票,符合常理.,此时虽然创建了多个进程,但是,由并发变为了串行.牺牲了运行效率,但是避免了竞争.
(2) 信号量

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据 。
假设商场里有4个迷你唱吧,所以同时可以进去4个人,如果来了第五个人就要在外面等待,等到有人出来才能再进去玩。
实现:
信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

import time import random from multiprocessing import Process,Semaphore def dbj(i,s): s.acquire() # 此时进来的人获取房间 print("%s号男同志来洗脚"%i) time.sleep(random.randrange(3,6)) #模拟他们的洗脚时间 s.release() #出房间,此时下个人可以进入 print("_____________________") if __name__ =="__main__": s = Semaphore(4) # 创建一个信号量(此时容量为4) for i in range (10): p = Process(target=dbj,args = (i,s)) #将信号量作为参数传给进程要执行的函数 p.start()
(3)事件

from multiprocessing import Process,Event e = Event() #创建一个事件对象 print(e.is_set()) #is_set()查看一个事件状态,默认为 False print("111111") e.set() # 将is_set()的状态改为True print(e.is_set()) #查看此时的事件状态 True e.clear() #将事件的状态修改为 False(清空,回到初始的状态) print(e.is_set()) #False e.wait() #根据is_set的状态来决定是否在此处阻塞住 ,当is_set() = False 就阻塞住,如果是True就不阻塞 print("00000") #set和clear 修改事件的状态 set-->True clear-->False #is_set 用来查看一个事件的状态 #wait 依据事件的状态来决定是否阻塞 False-->阻塞 True-->不阻塞

import time import random from multiprocessing import Process,Event def traffic_light(e): while 1: print("红灯啦") time.sleep(5) e.set() #修改为True print("绿灯亮") time.sleep(4) e.clear() #修改为True def car(i,e): if not e.is_set(): print("我们在等") e.wait() #判断此时是否要阻塞 print("走你") else: print("可以走了") if __name__ =="__main__": e = Event() #创建一个事件对象 hld = Process(target=traffic_light,args=(e,)) hld.start() while 1: time.sleep(2) for i in range(3): time.sleep(random.random()) p1= Process(target=car,args=(i,e)) p1.start()
(4)进程锁小结

#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。 IPC通信机制(了解):IPC是intent-Process Communication的缩写,含义为进程间通信或者跨进程通信,是指两个进程之间进行数据交换的过程。IPC不是某个系统所独有的,任何一个操作系统都需要有相应的IPC机制, 比如Windows上可以通过剪贴板、管道和邮槽等来进行进程间通信,而Linux上可以通过命名共享内容、信号量等来进行进程间通信。Android它也有自己的进程间通信方式,Android建构在Linux基础上,继承了一 部分Linux的通信方式。
2.进程之间的通信
(1)队列(重要)
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。队列就像一个特殊的列表,但是可以设置固定长度,并且从前面插入数据,从后面取出数据,先进先出
(1)Queue方法介绍

q = Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait( )
同q.get(False)方法。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
方法介绍

from multiprocessing import Queue q = Queue(3) #创建一个队列对象,队列长度为3 q.put(1) #往队列中添加数据 q.put(2) q.put(3) # q.put(4) #如果队列已满,程序就会停在这里,等待数据被别人取走,再将数据放入队列 #如果队列中的数据一直不被取走,程序就会永远停在这 try: q.put_nowait(3) #可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了就报错. except: #因此,我们用一个try语句来处理这个错误,这样程序不会一直阻塞下去,但是会丢掉这个消息 print("队列已经满了") #因此,我们在放入数据之前,可以先看一下队列的状态,如果已经满了就不在put了 print(q.full()) #查看是否满了,满了返回True,不满返回False print(q.get()) print(q.get()) print(q.get()) # print(q.get()) #同put方法一样,如果队列已经空了,那么读取就会阻塞 try: q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。 except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。 print('队列已经空了')
**关于q.empty()**

import time from multiprocessing import Queue q = Queue(3) q.put(1) q.put(2) print('----------',q.qsize()) #q.qsize()这个返回的是当前队列中已经占位的元素个数 q.put(3) q.get() q.get() q.get() print(q.empty()) #此时队列已经空了,因此结果为True q.put(4) # print('>>>>>',q.empty()) #结果为True # print(q.get()) # 4 time.sleep(0.1) #休眠一会儿 print('>>>>>',q.empty()) #结果为False print(q.get())
在空队列上放置对象之后,在队列的p.empty()方法返回False之前,可能有无限小的延迟,导致返回的结果是True
(2) 队列之间的通信

import time from multiprocessing import Process,Queue def girl(q): print("来自boy的消息",q.get()) print("来自校领导的凝视",q.get()) def boy(q): q.put("约吗") if __name__ == "__main__": q = Queue(5) girl_p = Process(target=girl, args=(q,)) boy_p = Process(target=boy, args=(q,)) girl_p.start() boy_p.start() time.sleep(1) q.put("好好工作,别捣乱")
(3)生产者与消费者模型

import time from multiprocessing import Process,Queue def producer(q): for i in range(1,11): time.sleep(1) print("生产了包子%s"%i) q.put(i) q.put(None) # 针对第三个版本的消费者,往队列里面加了一个结束信号 #版本一 # def consumer(q): # while 1: # time.sleep(2) # s = q.get() # print("消费者吃了包子%s" %s) #当队列空了时候会阻塞(等待包子) #版本二 # def consumer(q): # while 1: # time.sleep(0.5) # try : # s = q.get(False) # print("消费者吃了包子%s" %s) # except: # break #此时只会生产包子,不会吃包子(吃的太快,还没放运行就结束了) #版本三 def consumer(q): while 1: time.sleep(0.5) s = q.get() if s == None: break else: print('消费者吃了包子%s' %s) if __name__ == "__main__": q = Queue(20) pro_p = Process(target=producer,args=(q,)) pro_p.start() con_p = Process(target=consumer, args=(q,)) con_p.start()
(4) JoinableQueue([maxsize])

#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 #参数介绍: maxsize是队列中允许最大项数,省略则无大小限制。 #方法介绍: JoinableQueue的实例p除了与Queue对象相同的方法之外还具有: q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常 q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止,也就是队列中的数据全部被get拿走了。

import time from multiprocessing import Process,JoinableQueue def prodeucer(q): for i in range(1,11): time.sleep(0.5) print("生产了包子%s号" %i) q.put(i) q.jion() #生产者调用此方法进行阻塞,知道队列中所有的项目均被处理,阻塞将持续到队列中每 # 个项目均调用到q.task_done 也即队列中所有数据都被get拿走了 print("在这里等你") def consumer(q): while 1: time.sleep(1) s = q.get() print("消费者吃了包子%s"%s) q.task_done() #给生产者发送了一个任务结束的信号 if __name__ =="__main__": q = JoinableQueue(10) #通过队列来模拟缓冲区,设置大小为10 #生产者进程 pro_p = Process(target= prodeucer,args= (q,)) pro_p.start() #消费者进程 con_p =Process(target= consumer,args = (q,)) con_p.daemon = True # #将消费者进程设置为保护进程 con_p.start() pro_p.join() #此时给生产者进程设置join,当生产者进程结束后,主进程才会结束, #而生产者进程需要等消费者进程发送了p.task_done信号完毕才会结束,将消费者进设置为保护进程, #而只有当主进程结束后消费者进程才能结束 print('主进程结束')