我们都见过交通阻塞,一大堆汽车因为争夺行路权,互不相让而造成阻塞,又或者因为车辆发生故障抛锚或两辆车相撞而造成道路阻塞。在这种情况下,所有的车都停下来,谁也无法前行,这就是死锁。本篇就来了解一下什么是死锁,如何应对死锁。
一、死锁初窥
1.1 为何会发生死锁?
死锁的发生归根结底是因为对资源的竞争。因为大家都想要某种资源,但又不能随心所欲地得到所有资源,在争夺的僵局中,导致任何人无法继续推进。
在一个系统里存在多个线程,而这些线程共享该计算机系统里的资源。因为资源竞争而造成系统无法继续推进就难以避免了。这里的资源可以使硬件(CPU、内存、磁盘等),也可以是软件(例如锁、信号量等)。
1.2 死锁的定义与必要条件
(1)死锁的定义
如果有一组线程,每个线程都在等待一个事件的发生,而这个事件只能有该线程里面的另一线程发出,则称这组线程发生了死锁。这里的事件主要是资源的释放,在死锁状态中,没有线程可以执行、释放资源或被叫醒。
例如,有线程A和线程B如下:
如果线程A和线程B交替执行,那么线程A和线程B均会因为无法获得对应的资源而无法继续执行也不能释放锁,从而造成死锁,如下图所示:
(2)死锁的4个必要条件
① 资源有限:即一个系统里面的资源数量是有限的,以致于无法同时满足所有线程的资源需求。
② 持有等待:即一个线程在请求新的资源时,其已经获得的资源并不释放,而是继续持有。
③ 不可抢占:即如果可以抢占一个资源,则也不会发生死锁。(凡是可以抢占的资源,均不会称为死锁的原因)
④ 循环等待:即如果你等我、我等你,大家都这样等着对方,就产生了死锁。
二、应对死锁
2.1 引子:哲学家就餐问题
哲学家每天只做两件事:思考和吃饭。他们每天不停地思考人生的这里,比如人从什么地方来,人为什么是现在这个样子,人类往哪里去等深刻的问题。当然,思考久了就会感到饥饿,而饥饿了就要吃饭。但是,吃饭是有规矩的:
哲学家围坐在一个圆桌边,每个人的左右两边均放着一根筷子。如果要吃饭,需要获得左右两边的筷子(不能用一根筷子吃饭),如下图所示:
我们很自然地得到一个算法,对于每一个哲学家,执行以下的算法:
① 等待左边的筷子可用,然后拿起左边的筷子。
② 等待右边的筷子可用,然后拿起右边的筷子。
③ 吃饭。
④ 放下两根筷子。
显然,如果每个哲学家穿插着执行,将会出现每个哲学家都拿起左边筷子,而等待右边筷子的情况,即死锁将会发生。那么,有木有办法防止哲学家出现死锁呢?
2.2 死锁的应对方法
操作系统应对死锁的策略可以分为两大种、四小种。两大种是:允许死锁发生 和 不让死锁发生。四小种是:允许死锁发生有两个子对策,一是假装看不见不予理睬,二是死锁发生后想办法解决;不让死锁发生也有两个子对策,一是通过平时的仔细检点避免难题出现,二是通过将发生死锁的必要条件消除杜绝死锁的发生。
(1)顺其自然不予理睬
此种策略就是操作系统不做任何措施,任由死锁发生。老子曾说,无为而治,说的就是这个策略。但是,如果牵涉到高可靠性系统、实时控制系统,那就另当别论了,那绝对不允许死锁。
(2)死锁的检测与恢复
在死锁检测上,一般会利用到两个矩阵:一个是资源分配矩阵,另一个是资源等待矩阵,如下图所示:
资源分配矩阵
资源等待矩阵
此外,还维持两个矢量:一个是系统资源总量矢量(表示系统中所有资源的总数是多少),另一个是系统当前可用资源矢量(代表系统现在还有多少可用的资源),如下图所示:
有了上面的矩阵和矢量,我们就可以通过简单地矩阵运算来判断系统是否发生了死锁。例如,将上图中的资源可用数量矩阵与资源等待矩阵的每一行相减,都会出现负值,那么该系统将要发生死锁。
在死锁恢复上,首先可以抢占(即将某个线程所占有的资源强行拿走,分配给别的线程),其次可以将整个线程Kill杀掉(因为抢占一个线程的资源有可能造成该线程无法再正确运行了),最后则是Rollback回滚(即将整个系统回滚到过去的某个状态,大家从那个状态重新来过)
(3)死锁的动态避免
死锁的检测与恢复属于后发制人,这时死锁的消极后果已经产生,即使修复也已经浪费了时间,降低了效率,甚至造成了其他损失。因此,需要更加积极主动一点,不要等到死锁发生了再亡羊补牢,而是在运行中就小心翼翼,不让思索发生。
动态避免的原则在于:在每次进行资源分配时,必须经过仔细计算,确保该资源请求批准后系统不会进入死锁或潜在的死锁状态。例如,有一种资源的数量为10个,当前有3个线程正在运行。每个线程需要资源的最大数和当前已经占用的资源数如下表所示:
可以通过以下分配过程得知,存在一个资源分配顺序使得所有线程都能获得其需要的资源,从而得知当前状态是安全状态,不会产生死锁。相反,如果不存在这样一个顺序,那么就有可能产生死锁。
动态避免的优点就是无需等待死锁的发生,而是在死锁有可能发生的时候采取先发制人的措施,断然拒绝有可能进入死锁的资源请求。但是,计算一个状态是否安全并不是一件容易的事情。
(4)死锁的静态防止
该策略的中心思想是:清除死锁发生的土壤(即死锁的4个必要条件),只要清除4个必要条件中的一个,那么死锁将无法发生。
① 清除资源独占条件:一是增加资源到所有线程满足的资源需要,但这并不实际,因为资源是有限的;二是将资源变为共享,但并不适合与所有的资源(例如键盘输入就无法共享)。
② 清除保持和请求条件:一个线程必须一次请求其所需的所有资源,而不是一般情况下的请求一点资源做一点事情。由于一个线程一次就获得了其所需的所有资源,该线程就可以顺利执行,不会发生死锁。
③ 清除非抢占条件:允许抢占资源,也就是说可以从一个线程手上将资源抢夺过来。
④ 清除循环等待条件:出现循环等待是因为线程请求资源的顺序是随机的,所以只要约定线程对资源的使用顺序,那么死锁就不能发生。
2.3 银行家算法
顾名思义,银行家算法就是仿照银行发放贷款时采用的控制方式而设计的一种死锁避免算法,该算法的策略是实现动态避免死锁。
银行家算法的基本思想是:分配资源之前,判断系统是否是安全的;若是,才分配。每分配一次资源就测试一次是否安全,不是资源全部就位后才测试。我们可以把操作系统看作是银行家,操作系统管理的资源相当于银行家管理的资金,进程向操作系统请求分配资源相当于用户向银行家贷款。概括起来基本思想就是:
① 分配检测:Request < Need
Request < Available
② 安全序列检测算法
下面看一个在操作系统教科书中出现的例子:
某系统有R1,R2,R3共3中资源,在T0时刻P0,P1,P2,P3和P4这5个进程对资源的占用和需求情况如下表1,此时系统的可用资源向量为(3,3,2)。试问:
1、T0时刻系统是否存在安全序列?
2、P1请求资源:P1发出请求向量Request(1,0,2),系统是否接受该请求?请使用银行家算法检查
表1 T0时刻的资源分配表
MAX Allocation Need Available P0 7 5 3 0 1 0 7 4 3 3 3 2 P1 3 2 2 2 0 0 1 2 2 P2 9 0 2 3 0 2 6 0 0 P3 2 2 2 2 1 1 0 1 1 P4 4 3 3 0 0 2 4 3 1 1、T0时刻系统是否存在安全序列?
Available > Need1 ----> 可用资源分配给P1,直到P1进程执行完成,然后Available = Available + Allocation1 = (5,3,2)
Available > Need3 ----> 可用资源分配给P3,直到P3进程执行完成,然后Available = Available + Allocation3 = (7,4,3)
Available > Need4 依次类推
得到安全序列为:P1,P3,P4,P2,P0
2、P1请求资源:P1发出请求向量Request(1,0,2),系统是否接受该请求?请使用银行家算法检查
第一步(假分配检查):把Request分配给P1,必须满足Request要小于Available,Request要小于Need。
Request(1,0,2)< Available(3,3,2)
Request(1,0,2)< Need(1,2,2)
因为满足第一步检查,进入第二层检查(安全序列检查)。
第二步(安全序列检查):建立安全性检查表
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 如果 Work > Need,那么执行Work+Allocation,得到:
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 5 3 2 true 5 3 2 找到Need < Work的进程,如果没有找到这样的进程而进程集合没有执行,则算法返回,得到不存在安全序列结果,否则继续执行该算法。
这里我们找到了P3进程。修改安全序列检查表:
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 5 3 2 true P3 5 3 2 0 1 1 2 1 1 7 4 3 true 7 4 3 这样一直执行到所有的进程到完成,以完成该安全序列检查表:
Work Need Allocation Work+Allocation Finish P1 2 3 0 0 2 0 3 0 2 5 3 2 true P3 5 3 2 0 1 1 2 1 1 7 4 3 true P4 7 4 3 4 3 1 0 0 2 7 4 5 true P0 7 4 5 7 4 3 0 1 0 7 5 5 true P2 7 5 5 6 0 0 3 0 2 10 5 7 true 这样就找到了整个安全序列为:P1,P3,P4,P0,P2
总的来说,银行家算法是一个动态避免死锁算法,通过对资源的仔细分配以避免死锁。其特点是可以超额批准客户的信用额度,即所有客户的信用额度之和可以超过银行的全部资本,这就是杠杆(Leverage)!
2.4 解决:哲学家就餐问题
这里使用C#语言,模拟信号量,以消除死锁的必要条件(消除保持并等待的必要条件)的方式来实现解决哲学家就餐问题。
(1)首先定义哲学家的三种状态:
/// <summary> /// 哲学家状态 /// </summary> public enum StateEnum { // 思考状态 THINKING = 0, // 饥饿状态 HUNGRY = 1, // 吃饭状态 EATING = 2 }
(2)然后定义一个临界区互斥用的信号量,给每个哲学家单独定义一个信号量。如果一个哲学家需要阻塞,则阻塞发生在该信号量上。
private const int NumOfPhilosopher = 5; // 哲学家人数 private static StateEnum[] states = new StateEnum[NumOfPhilosopher]; // 记录每个哲学家当前状态的数组 private static semaphore mutex = 1; // 模拟互斥信号量 private static semaphore[] s = new semaphore[NumOfPhilosopher]; // 每个哲学家等待一个单独的信号量
这里的semaphore其实就是int类型:
using semaphore = System.Int32;
要模拟互斥信号量,需要有两种基本原语操作Up 和 Down:
/// <summary> /// 互斥信号量Down /// </summary> private static void Down(semaphore mutex) { if (mutex == 1) { mutex--; } } /// <summary> /// 互斥信号量Down /// </summary> private static void Down(ref semaphore mutex) { // 阻塞操作 while (mutex < 1) { } } /// <summary> /// 互斥信号量Up /// </summary> private static void Up(semaphore mutex) { if (mutex == 0) { mutex++; } } /// <summary> /// 互斥信号量Up /// </summary> private static void Up(ref semaphore mutex) { if (mutex == 0) { mutex++; } }
(3)哲学家的两种生活状态:Think 和 Eat
/// <summary> /// 思考 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void Think(int philosopher) { Console.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); } /// <summary> /// 吃饭 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void Eat(int philosopher) { Console.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); }
(4)哲学家的日常生活:思考,拿筷子,吃饭,放下筷子,继续思考......
/// <summary> /// 哲学家程序 /// </summary> /// <param name="philosopher">哲学家编号</param> private static void PhilosopherRoutine(object number) { int philosopher = (semaphore)number; while (true) { Think(philosopher); TakeChopsticks(philosopher); // 同时获得两根筷子,否则阻塞等待 Eat(philosopher); PutChopsticks(philosopher); // 同时放下两根筷子 } } /// <summary> /// 获取筷子 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void TakeChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.HUNGRY; Test(philosopher); // 试图拿起两根筷子 Up(mutex); Down(ref s[philosopher]); // 如果没有拿到筷子,则继续阻塞等待 } /// <summary> /// 放下筷子 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void PutChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.THINKING; int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; // 测试左面的哲学家是否可以吃饭 Test(left); // 测试右面的哲学家是否可以吃饭 Test(right); Up(mutex); } /// <summary> /// 测试是否可以同时拿起两根筷子 /// </summary> /// <param name="philosopher">哲学家编号</param> private static void Test(int philosopher) { int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; if (states[philosopher] == StateEnum.HUNGRY && states[left] != StateEnum.EATING && states[right] != StateEnum.EATING) { // 可以拿起两根筷子,改变哲学家状态到吃饭状态 states[philosopher] = StateEnum.EATING; // 发出叫醒信号 Up(ref s[philosopher]); } }
Run之后的结果如下图所示:
这样看不清楚,截一段结果出来看看:
Philosopher:2 IS THINKING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:1 IS THINKING. Philosopher:5 IS EATING. Philosopher:4 IS THINKING. Philosopher:3 IS EATING. Philosopher:5 IS THINKING. Philosopher:2 IS EATING. Philosopher:1 IS EATING. Philosopher:4 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:1 IS THINKING. Philosopher:4 IS THINKING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:1 IS EATING. Philosopher:2 IS THINKING. Philosopher:4 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:1 IS THINKING. Philosopher:3 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:4 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:3 IS EATING. Philosopher:5 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS EATING. Philosopher:1 IS EATING. Philosopher:5 IS THINKING. Philosopher:3 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:4 IS EATING. Philosopher:5 IS THINKING. Philosopher:3 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:5 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:5 IS EATING. Philosopher:1 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:5 IS THINKING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:1 IS EATING. Philosopher:5 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:4 IS EATING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:3 IS EATING. Philosopher:4 IS EATING. Philosopher:2 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS THINKING. Philosopher:5 IS THINKING. Philosopher:1 IS THINKING. Philosopher:4 IS THINKING. Philosopher:2 IS THINKING. Philosopher:5 IS EATING. Philosopher:3 IS EATING. Philosopher:5 IS THINKING. Philosopher:4 IS EATING. Philosopher:3 IS THINKING. Philosopher:1 IS EATING. Philosopher:4 IS THINKING. Philosopher:2 IS EATING. Philosopher:5 IS EATING. Philosopher:3 IS EATING. Philosopher:1 IS THINKING. Philosopher:4 IS EATING. Philosopher:2 IS THINKING. Philosopher:5 IS THINKING. Philosopher:5 IS EATING.
可以看到,哲学家们交替着思考吃饭,井然有序,没有发生死锁。这里没有使用.NET中现有的Mutex、Semaphore等类型,而采用了一个int类型来模拟最简单的互斥信号量。
完整的代码如下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using semaphore = System.Int32; namespace PhilosopherDemo { public class Program { private const int NumOfPhilosopher = 5; // 哲学家人数 private static StateEnum[] states = new StateEnum[NumOfPhilosopher]; // 记录每个哲学家当前状态的数组 private static semaphore mutex = 1; // 模拟互斥信号量 private static semaphore[] s = new semaphore[NumOfPhilosopher]; // 每个哲学家等待一个单独的信号量 /// <summary> /// 初始化哲学家状态 /// </summary> private static void InitializePhilosopher() { for (int i = 0; i < NumOfPhilosopher; i++) { states[i] = StateEnum.THINKING; s[i] = 1; } } /// <summary> /// 哲学家程序 /// </summary> /// <param name="philosopher">哲学家编号</param> private static void PhilosopherRoutine(object number) { int philosopher = (semaphore)number; while (true) { Think(philosopher); TakeChopsticks(philosopher); // 同时获得两根筷子,否则阻塞等待 Eat(philosopher); PutChopsticks(philosopher); // 同时放下两根筷子 } } /// <summary> /// 获取筷子 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void TakeChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.HUNGRY; Test(philosopher); // 试图拿起两根筷子 Up(mutex); Down(ref s[philosopher]); // 如果没有拿到筷子,则继续阻塞等待 } /// <summary> /// 放下筷子 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void PutChopsticks(int philosopher) { Down(mutex); states[philosopher] = StateEnum.THINKING; int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; // 测试左面的哲学家是否可以吃饭 Test(left); // 测试右面的哲学家是否可以吃饭 Test(right); Up(mutex); } /// <summary> /// 测试是否可以同时拿起两根筷子 /// </summary> /// <param name="philosopher">哲学家编号</param> private static void Test(int philosopher) { int left = (philosopher + NumOfPhilosopher - 1) % NumOfPhilosopher; int right = (philosopher + 1) % NumOfPhilosopher; if (states[philosopher] == StateEnum.HUNGRY && states[left] != StateEnum.EATING && states[right] != StateEnum.EATING) { // 可以拿起两根筷子,改变哲学家状态到吃饭状态 states[philosopher] = StateEnum.EATING; // 发出叫醒信号 Up(ref s[philosopher]); } } /// <summary> /// 思考 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void Think(int philosopher) { Console.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS THINKING.", philosopher + 1); } /// <summary> /// 吃饭 /// </summary> /// <param name="philosoper">哲学家编号</param> private static void Eat(int philosopher) { Console.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); System.Diagnostics.Debug.WriteLine("Philosopher:{0} IS EATING.", philosopher + 1); } /// <summary> /// 互斥信号量Down /// </summary> private static void Down(semaphore mutex) { if (mutex == 1) { mutex--; } } /// <summary> /// 互斥信号量Down /// </summary> private static void Down(ref semaphore mutex) { // 阻塞操作 while (mutex < 1) { } } /// <summary> /// 互斥信号量Up /// </summary> private static void Up(semaphore mutex) { if (mutex == 0) { mutex++; } } /// <summary> /// 互斥信号量Up /// </summary> private static void Up(ref semaphore mutex) { if (mutex == 0) { mutex++; } } public static void Main(string[] args) { InitializePhilosopher(); for (int i = 0; i < NumOfPhilosopher; i++) { Thread thread = new Thread(PhilosopherRoutine); thread.Start(i); } Console.ReadKey(); } } }
参考资料
邹恒明,《操作系统之哲学原理》,机械工业出版社