写这篇确实挺伤脑筋的,是按部就班一行一行读,但是我想这么写估计很多没有接触过的可能就劝退了,很容易出现的一种现象就是看了后面忘了前面,而且很容易看了一行代码就一层层往下钻,这样不仅容易打击看源码的积极性,而且效率贼低。doug lea大神的代码设计的那么精妙,浪费时间在这上面太可惜了。
在讲doug lea大神的设计之前,我们考虑一下,如果是我们设计一个锁该怎么实现,要满足同一时间只有一个线程可以进入临界区,我想如果是我的话,我设置一个状态标识当前是否有线程在使用,在使用cas原子操作改变这个值,保证在多线程情况下不会出现有并发修改状态的情况,类似这样的伪代码:
if(cas success){
执行临界区代码
cas out
}
return;
仅此肯定是不够的,因为我还要保证我的锁释放了,其他线程可以立刻过来抢锁,所以我可以在现有基础上加上一段逻辑,就是如果我竞争失败了,我不断轮询尝试改变状态,类似:
while(!cas success){
空转
}
执行临界区代码
cas out
return;
但是有没有发现这种方式呢,太耗资源了,如果很多线程竞争,线程空转肯定会导致我们的服务cpu负荷变大,那么怎么解决这个问题呢?
既然是cpu空转导致的,那就让没拿到锁的线程不要转了,先回去歇歇吧,等到我锁释放了你再来。可是这就出现了一个问题,我没拿到锁,我回去歇着可以,但是你都不知道我是谁咋叫我呢。既然这样我们就得解决拿到锁的线程不认识没拿到锁的线程,我们可以把等待的线程放在一个集合或者队列里面,然后释放锁就去把所有的线程一个个叫醒,或者就一次叫醒一个,叫醒的移出队列,相比之下,肯定是一次叫醒一个性能高,我想一个一个叫醒,很明显队列的优势比较大,性能也高。
我们自己设计差不多也就这样了,带着这些思考,我们再去看看doug lea大神的reentrantLock是如何设计的:
可以看到reentrantLock包含了三个内部类:
abstract static class Sync extends AbstractQueuedSynchronizer(我叫它大娃)
static final class FairSync extends Sync(二娃)
static final class NonfairSync extends Sync(三娃)
很明显二娃三娃是大娃衍生出来的,毕竟长兄如父嘛,二娃比较公正,也叫公平锁,三娃比较机灵,他才不管公平不公平,也叫非公平锁,我们再看看他们到底有啥区别:
三娃想要拿锁会先去尝试一遍,拿到直接结束,拿不到在走常规流程,二娃就没有前面一步,不管能不能抢到,都按规矩去排队,再看acquire方法,两者都是一样的:
看到上图的时候,就看到tryAcquire方法不一样,我们分析一下两者的区别:
首先看二娃的实现:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //锁是否被占用 if (!hasQueuedPredecessors() && //此行用于判断当前队列是否被初始化,没有被初始化就不用排队了直接抢锁 compareAndSetState(0, acquires)) { //这个很简单,cas修改锁状态 setExclusiveOwnerThread(current); //设置当前占用线程为自己 return true; } } else if (current == getExclusiveOwnerThread()) { //这里判断线程是否是自己,用于重入锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; //抢锁失败,乖乖排队去 }
上面cas操作可以点进去看可以发现其实就是修改的state:
下面重入锁拿到锁setState方法也就是对state进行操作:
这里其实就可以看得出来,和我们一开始想的是一样的嘛,用一个状态控制,只不过他这里更加牛逼的是可以重入,这是我们没想到的。
看完二娃的,调皮的三娃也来秀一波:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //拿到当前线程 int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //拿锁 setExclusiveOwnerThread(current); //拿到锁设置为自己 return true; } } else if (current == getExclusiveOwnerThread()) { //重入校验 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
你以为三娃之前抢过一次就算了,这里如果不确定已经被拿到锁了,还是会去先抢一次,抢到就是赚到,抢不到再去排队呗,后面其他的基本都是一样的。
看到这里是不是觉得咱们之前的设计和doug lea大神的也差不多嘛,咱也是大牛啊,哈哈。沾沾自喜的同时咱们接着往下看,拿到锁是结束了,可是要是拿不到锁,咱还要排队呢,再看下是否和我们想的一样,队列睡眠,逐个唤醒:
接着看 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ,发现里面还嵌套着一个方法addWaiter,那我们就先看这个:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //1.新建一个节点 // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 2.拿到队列最后一个节点 if (pred != null) { node.prev = pred; //3.将新建节点前置节点设为当前队尾节点 if (compareAndSetTail(pred, node)) { //4.cas设置最后一个节点为新建节点 pred.next = node; // 5.设置之前拿到的最后节点的下一个为自己,到这就已经完成了在队列尾部加上自己了 return node; } } enq(node); // 如果上面没成功,那就要进到这里面无限轮询,直到设置成功 return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter的前5步如果都执行成功了,那么也就在队尾加成功了,流程如下图:
这一步其实还是蛮简单的,但是如果没成功呢,再看enq方法,方法首先判断队尾是否为空,啥时候队尾为空呢,很明显是这个队列里面还没有其他线程在等,那么就会传一个新的节点给自己(这里也是没有加锁的,虽然cas能保证原子性,但不能保证cas前后的操作也是,所以前面二娃在判断队列是否初始化时,前置条件就是头尾节点不相等,而且头结点的下一个节点不为null),也就是说,队列只有在有线程竞争锁的时候,才会初始化,而设置完头尾节点还没结束,会接着轮询,直到能够入队。虽然能看懂,但是扪心自问,确实想不出来还能这样处理。
上面已经完成了入队,但是就按我们之前分析的光入队没用啊,还得去竞争锁啊,那我们接着看acquireQueued方法,
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //获取前置节点 if (p == head && tryAcquire(arg)) { //前置节点为头节点时,尝试获取锁 setHead(node); //获取成功设置当前节点为头节点 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //判断当前节点是否需要阻塞 parkAndCheckInterrupt()) // 阻塞当前线程 interrupted = true; } } finally { if (failed) //这个字段是当取消锁竞争时会触发的 cancelAcquire(node); } }
我们可以看到这个方法核心的两件事:1.尝试拿锁,2.阻塞当前线程,而拿锁的前提便是前一个节点是头节点,拿锁成功便设置自己为头节点,我们再看setHead方法做了什么:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
就是将自己设置为头节点,然后将线程属性和前置属性清空,其实我觉得这个时候前置节点清空是肯定要做的,但线程不需要,不过这也是大神们的厉害之处,细节处理得非常好。
拿到锁这里就结束了,没拿到锁那就要阻塞了,首先看shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
乍一看,看不懂,这啥玩意,node节点waitStatus又和Node.SIGNAL对比,又是和0相比,点开Node:
一共有四种不同状态:
SINGAL:表示当前节点很快被阻塞,因此节点再释放或取消后,需要唤醒后续节点,唤醒后竞争失败,则需要继续阻塞 状态值:-1
CANCELLED:表示当前线程超时或者中断可,这个状态的节点不会再次阻塞,也不会切换其他状态 状态值:1
CONDITION: 这个表示一个等待队列,后面会讲到 状态值:-2
PROPAGATE:传播属性,表示这个节点是共享锁,这个标识通常后面可以继续唤醒 状态值-3
介绍完这几种状态,我们在回到上面代码,那就很简单了,首先判断当前节点是不是SINGAL,是表示可以被阻塞,直接返回,然后判断大于0,也就是取消状态,一直往队列前找,直到节点不为取消,返回false,表示不能被阻塞,进入下一次轮询,都不是这两种则尝试将前一个节点的状态设置为SINGAL(这里有可能会发生其他线程已经将状态改了,那就要进入下一次尝试),如果修改成功则进入轮询。总结一下就是后一个节点第一次过来,前面两个条件正常情况都不满足(先不考虑取消),那么就会尝试将前一个结点状态修改为可阻塞。这块设计的确实很精妙,虽然同样是if else,doug lea大神几行代码完成了我们几十行都不一定能写的很好的功能。
上面如果返回true则会执行parkAndCheckInterrupt方法,这个就很简单:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
阻塞当前线程,返回中断标志位状态(这个是给Condition队列用的)。
独占锁的枷锁过程到这里也就结束了,除了在一些细节方面大神做的确实牛逼,目前为止和我们一开始的设计思路还是差不多的,写到这刚好看到响应中断的加锁有啥区别:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
这里多了一个中断响应,再往下,tryAcquire是一样的,加锁失败的流程:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
这里也就标红的地方不一样吧,普通的就是把interrupted字段设为true,而这个会抛出异常,而为了防止已经被中断执行冤枉代码,所以在一开始进来的地方也加了个中断响应,厉害了我的道哥。
看完上面这些,我们先总结一下,java当中锁可不止ReentrantLock一个,Lock的实现有很多,:
基本都是使用或者参考aqs来实现的,写了这么长,才写完了加锁过程,然而ReentrantLock里面其实没有多少代码,这些代码基本上都是aqs里面的,比如上面的acquire方法,基本上加锁所有的逻辑都是在这里完成的,再比如acquireInterruptibly方法,共享锁的acquireShared方法,控制加锁的状态state,Node等都是aqs中的,可以说aqs设计了一整套,而具体到需求的实现,由具体的类来完成。
小插曲说完,本来是想只写独占锁的,既然聊到了传播属性,还是聊一聊共享锁的加锁吧:
入口:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
这里tryAcquireShared方法是aqs的一个钩子,必须要子类来实现,比如CountDownLatch实现就很简单,而类似读写锁这些设计就复杂得多,这里就不做过多赘述,我们主要讲没拿到锁,aqs里面做了什么:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //新增队列至队尾 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //拿到前一个节点 if (p == head) { //判断前一个是否是头节点 int r = tryAcquireShared(arg); //尝试拿锁 if (r >= 0) { setHeadAndPropagate(node, r); //设置头节点和传播属性,很重要 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
其他操作基本和独占锁一样的,主要看setHeadAndPropagate方法:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //设置当前节点为头节点 /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 //表示拿锁成功 || h == null //表示头节点为空 || h.waitStatus < 0 //表示不为取消状态 || (h = head) == null //这一步和下一步看起来是一样的,但是其实这里head已经变成新加的节点 || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //判断是否是null或者共享节点 doReleaseShared(); //尝试释放锁 } }
这里主要的变化就是加了一个如果头结点的下个节点为为空SHARED属性则尝试释放锁:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
这里首先判断队列是否为空,不为空则判断头结点的wait属性是否为SINGAL,并尝试将状态改为0并唤醒(唤醒后会继续抢锁,会再次来到这里,则会走下一步,而此时这个唤醒的节点前一个节点又是头结点,又会尝试拿锁,直到拿不到锁,park),然后尝试将waitStatus改为PROPAGATE,这才结束。设计可谓是一环套一环,我们接下来再看解锁过程:
首先看独占锁:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
首先尝试解锁,这里也是供不同需求实现的,如ReentrantLock便是修改state至0表示解锁成功,解锁成功后判断头结点不为空(防止其他线程先来),unparkSuccessor:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
这边操作也不算复杂,cas设置waitstatus为0,校验下一个节点是否取消并排除掉,唤醒下一个节点线程。
接下来看共享锁解锁:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
一样的看doReleaseShared方法:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
有没有发现就是刚才讲的释放锁过程,共享锁也就比独占锁多一个传播机制,但却实现了两种不同的锁。
讲到这里aqs关于锁方面的知识也就结束了,接下来就要聊到等待队列Condition了,Condition的使用很简单,可以参考我上一篇文章:https://www.cnblogs.com/gmt-hao/p/14110722.html,最核心的方法也就是await , singal,至于可控时间或者中断处理,就不细聊了。
首先介绍一下等待队列吧,我们前面已经看了用双端队列实现锁,等待队列和之前的是一样的,前面waitStatus就聊到过一个CONDITION属性,也就是对应着我们这个CONDITION,有了前面的基础,我们也不多扯了,直接看代码:
首先看await方法:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //新增节点 int savedState = fullyRelease(node); //释放当前线程锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { //判断当前是否CONDITION节点或者第一个节点 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //响应中断 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //尝试拿锁,拿不到park interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //去除掉取消的 unlinkCancelledWaiters(); if (interruptMode != 0) // reportInterruptAfterWait(interruptMode); }
首先新增一个CONDITION节点,然后释放当前线程锁,判断是否CONDITION节点,按道理这里肯定是CONDITION节点,还要判断吗,具体原因我们下面再聊,后面会尝试拿锁,和之前逻辑一样,拿不到park阻塞,我们再看一下singal方法:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
主要看transferForSignal方法,首先将CONDITION状态改为0,这也是为啥上面要判断是不是CONDITION,改成功入队(可以抢锁了),如果已经被取消或者修改失败,则重新唤醒同步。总结一下就是一个等待队列,被唤醒后会加入到抢锁队列队尾,然后等机会拿锁。
大致就是这样的一个模型,一个抢锁队列,多个等待队列。
总结:
最重要的是理解上面的抢锁是如何设计与实现的,首先我们碰到问题是想想自己是否有思路能做到这样,然后看看大牛的实现是否和自己有想通之初,最后再想想大牛的做法比自己牛在什么地方。看懂了源码只是第一步,doug lea的aqs 中基本上没有多余的代码,比如共享释放锁,入队,抢锁等功能虽然多个地方用到了,但是其实现始终只有一个,后面我也要考虑考虑在系统设计及代码实现的时候如何做到功能原子化,而在设计的时候更多的去考虑我们做的东西是为了解决什么样的一类型的问题。
java实现的锁算是讲完了,虽然像读写锁,CountDownLatch之类的其实都是用了aqs,其核心原理都是一样的,后面有时间我将写一篇synchronized的源码分析,并总结一下锁在我心里是啥样的,其实很多东西我虽然看懂了但是有些地方还是掌握不好,比如像doug lea大神在很多地方都用到了cas,而且大部分抢锁,入队都没有做线程安全处理,只在一些核心的地方加cas便可以做到并发安全的控制,这是我现阶段还无法做到的,希望有一天也可以做到这么牛逼吧。