一. 锁
我们实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间运行没有先后顺序,一旦开启我们不能控制.但是这样会出现新的问题 :
当多个进程使用同一份数据资源时,就会引发数据安全或者顺序的混乱问题.
#文件a中内容 : 1 from multiprocessing import Process import time def cha(i): with open('a') as f: time.sleep(0.5)#模拟网络延迟 s = f.read() print('第%s个人查到还有%s张余票'% (i,s)) def mai(n): with open('a') as f: s = int(f.read()) time.sleep(0.5) if s > 0: print('33[32m第%s个人购票成功33[0m'% n) s = s-1 else: print('第%s个人没有买到票'% n) with open('a',mode='w') as f1: time.sleep(0.5) f1.write(str(s)) if __name__ == '__main__': for i in range(10): p= Process(target=cha,args=(i+1,)) p.start() for n in range(10): p1 = Process(target=mai,args=(n+1)) p1.start()
由以上结果来看,进程开启后不可控制,因为某一个人买完票后要讲库存票数-1重新写入文件,但是这个过程会有些许延迟,此延迟也许就会让很多进程查到库存中还未更改的数据,但其实库存已经没有票了.
这个时候就需要引进锁的概念 : 将一次购票过程封闭在一个空间内,在空间入口有一把带着钥匙的锁,先抢到钥匙的进程,先执行空间内的过程,其他进程在空间外等待,直到上一个进入空间内的进程执行完毕,再进入下一个进程. 充分保证了数据的安全性.
#上锁的购票 from multiprocessing import Process,Lock import time def cha(i): with open('a') as f: time.sleep(0.1)#模拟网络延迟 s = f.read() print('第%s个人查到还有%s张余票'% (i,s)) def mai(n,l): l.acquire() #上锁 with open('a') as f: s = int(f.read()) time.sleep(0.1) if s > 0: print('33[32m第%s个人购票成功33[0m'% n) s = s-1 else: print('第%s个人没有买到票'% n) with open('a',mode='w') as f1: time.sleep(0.1) f1.write(str(s)) l.release() #解锁 if __name__ == '__main__': l = Lock() for i in range(10): p= Process(target=cha,args=(i+1,)) p.start() for n in range(10): p1 = Process(target=mai,args=(n+1,l)) p1.start()
1. 死锁与递归锁
所谓死锁: 就是指两个或者两个以上的额进程或者线程在执行的过程中, 因争夺资源而造成的一种相互等待的现象. 如果没有外力作用, 他们将无法推进下去. 此时成系统处于死锁状态或者系统产生了死锁. 这些永远在互相等待的进程成为死锁进程.
####示例 from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解决办法: 递归锁, 在Python中为了支持在同一线程中多次请求统一资源, python提供了可重入所RLock. 这个PLock内部维护着一个Lock和一个counter变量, counter记录了acquire的次数, 从而使得资源可以被多次release. 直到一个线程所有的acquire都被release, 其他的线程才能获得资源.
###示例 from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
二. 信号量(multiprocess.Semaphore)
锁只允许一个进程在某一时间进行操作,而信号量Semaphore是可以允许多个进程同时进行操作.信号量内部基于内部计数器 : 每调用一次acquire()计数器就会减1;每调用一次release()计数器就会相应加1;当计数器为0时,acquire()调用就会被阻塞
from multiprocessing import Semaphore,Process def ktv(i,s): s.acquire() #加锁 print('第%s个人进入了ktv'% i) if __name__ == '__main__': s = Semaphore(4) #控制允许4个进程可以执行 for i in range(10): p = Process(target=ktv,args=(i,s)) p.start()
三. 事件(multiprocess.Event)
进程的事件用于主进程控制子进程,事件主要提供了三个方法: set,wait,clear
e = Event() #实例化一个事件的对象
事件是通过 is_set()的布尔值, 去标识e.wait()的阻塞状态. is_set()默认为False状态
当is_set()的布尔值为True时,e.wait()处为阻塞状态
当is_set()的布尔值为False时,e.wait()处为非阻塞状态
当使用e.set()时, 是把is_set()的布尔值变为True
当使用e.clear()时, 是把is_set()的布尔值变为False
from multiprocessing import Event,Process import time def deng(e): while 1:#红绿灯交替循环 if e.is_set():#此时is_set()状态为True,应该是非阻塞,绿灯亮 time.sleep(5)#给绿灯亮停留一段时间 print('红灯亮了')#绿灯亮过后红灯开始亮 e.clear()#将is_set()的布尔值给为False else:#此时is_set()状态为False,应该是阻塞状态,红灯亮 time.sleep(5)#给红灯亮一段时间 print('绿灯亮了')#红灯过后绿灯开启 e.set()#将is_set()状态修改为True def che(i,e): e.wait() print('第%s辆车过去了'% i) if __name__ == '__main__': e= Event() p = Process(target=deng,args=(e,)) p.start() for i in range(10): che1 = Process(target=che,args=(i,e)) che1.start()
以上代码只展示部分结果