一、概述
可重入,公平或非公平,使用AQS的互斥锁
二、源码
1、Lock接口
//锁 void lock(); //在等待锁时被中断会抛异常 void lockInterruptibly() throws InterruptedException; //判断锁是否可用,可用就获取 boolean tryLock(); //如果没取到锁进入中断等待 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //解锁 void unlock(); //创建条件 Condition newCondition();
2、构造方法
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
非公平锁还是公平锁,默认非公平锁
3、内置同步器和2种锁实现
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; //抽象了lock方法 abstract void lock(); //为非公平锁特制的 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果锁未使用,就占用锁,然后设置当前线程为独占 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果是当前线程独占的,就独占+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //非公平和公平锁共用的释放,实现AQS的tryRelease模板方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; //如果不是当前线程释放的锁就抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果锁空出了,就清空当前线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //给Condition用的,判断当前线程是否就是独占线程 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } //获得当前独占的线程 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } //查询当前线程锁定的次数 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { //如果没有竞争,就直接设置当前线程 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { //需要借助队列来保证获得锁的公平性,如果没有竞争也不能像非公平锁一样直接占用 acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果没有比当前线程更早等待的线程,同时状态置换成功,就占有锁,不然就返回false if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前线程已经占有锁,就+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
4、一般方法
public void lock() { sync.lock(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public int getHoldCount() { return sync.getHoldCount(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public boolean isLocked() { return sync.isLocked(); } public final boolean isFair() { return sync instanceof FairSync; } protected Thread getOwner() { return sync.getOwner(); } public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } public final int getQueueLength() { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } public boolean hasWaiters(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); } public int getWaitQueueLength(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); } protected Collection<Thread> getWaitingThreads(Condition condition) { if (condition == null) throw new NullPointerException(); if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) throw new IllegalArgumentException("not owner"); return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); }
很简单,实现了tryAcquire和tryRelease以后,剩下的都是调用AQS开放的接口实现锁的功能
三、解析
ReentrantLock和其他同步器一样,在内部有一个名叫Sync的对AQS的实现类,不过这个Sync是抽象类,它另外有NonfairSync、FairSync这2个实现类,表示公平锁和非公平锁,不过在实现类中代码区别不是特别大
ReentrantLock默认是创建非公平锁,公平锁和非公平锁区别在于非公平锁有2次插队行为,不像公平锁一样进来就去队伍尾部排队
下面完整走一遍lock和unlock的流程
1、线程执行到lock()
lock()是Sync中定义的抽象方法
final void lock() { acquire(1); }
这是公平锁的实现
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
这是非公平锁,出现了第一次插队行为,通过compareAndSetState(0, 1)对state尝试cas操作,看有没有阻塞,或者刚好空出的位置,不然再调用AQS的acquire(1)去获取
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
AQS的获取锁核心方法
tryAcquire()交给同步器去实现自己的尝试获取独占锁逻辑,后面说
如果获取成功,就不会执行后面的方法,就走完lock()了,这一点也说明如果还没有产生阻塞,AQS中的队列是未初始化的
如果未获取成功,则继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
先执行addWaiter()方法,加入当前线程节点到队列尾部
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
先创建一个当前线程的节点,有队列的话(tail != null)尝试将节点放入队列尾部,如果成功就返回了,如果失败说明有竞争情况,就进入enq()方法再自旋进行放入尾部操作
在enq中,先判断队列是否为空,为空的话创建一个队头节点,并让tail指向它,然后进行下一次循环
之后的放入尾部操作和addWaiter里的一样,如果失败的话就继续进入下次循环再次尝试
成功后返回之前的acquireQueued(addWaiter(Node.EXCLUSIVE), arg),继续执行acquireQueued(),将当前线程阻塞住(停在lock()方法里)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
在for(;;)这个无限循环中,有这样的情况
情况一:当一个之前阻塞在if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())这行的线程,被它的前继线程唤醒了,它结束了当前循环开始下一个循环,进入if (p == head && tryAcquire(arg)) {这一行
如果是公平锁没有被插队,那当前节点的前继节点就是head,并且它tryAcquire()成功,进入里面的代码块,先通过setHead(node)把head指向自己成为新的head,然后p.next = null让之前的head等待被回收(可以发现AQS的release只释放了线程,没有对head操作,因为留到了这里做)
如果是非公平锁就有被插队的可能,如果被新来的线程插队了,就再次进入if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())这行阻塞吧
情况二:一个新的待阻塞线程进来,发现他的前继不是head,就直接进入下一步开始阻塞了
情况三:第二个待阻塞线程进来,他的前继是head,并且刚好第一个线程跑完了,tryAcquire()成功,就把自己变为新的head,然后返回走完lock()了
shouldParkAfterFailedAcquire()的作用是根据前继节点的状态来判断是否应该阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
如果前继节点是node.signal = -1,那么当前线程就是需要阻塞的
如果前继节点是1表示被取消了,就往后找到非取消状态的节点,重新整理队列后返回false,进入下次循环
如果前继节点是0就是第一个空head的情况,就把他设置为node.signal = -1,进入下次循环再判断(正常的话下一次就是-1了,然后需要阻塞,例外就是他新的循环里tryAcquire()碰巧得到了锁)
parkAndCheckInterrupt()方法就是稳稳的开始阻塞了
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
需要注意的是线程被唤醒时,依旧是从这里重新开始往下执行,顺便返回线程是否被中断了,返回后回到for(;;)这个无限循环中,开始新一轮的争用锁(之所以是争用因为有可能在非公平锁下被插队)
最后看一下tryAcquire()排它锁在ReentrantLock()中的实现代码
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁和非公平锁的实现几乎一样,只是公平锁多了一个!hasQueuedPredecessors()的判断,判断当前线程是否需要排队,当c == 0时说明锁空出了,公平锁需要判断是否要排队的情况(大概率是要排队的,不能直接cas争用state)
public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
如果h != t说明队列还未初始化,当前线程可能是进来的第2个线程,直接返回去compareAndSetState(0, acquires)争用锁,如果成功就顺利退出lock()继续往下走,如果失败说明被一起进来的第3个线程抢了先...争用失败
如果队列初始化了,head的后继节点不为空且等于自己(重入)时不需要排队,其他情况都要排队
非公平锁就不管这些情况,上来就去争用一下,这是非公平锁的第2次插队
else if (current == getExclusiveOwnerThread()) {这句就是判断当前线程重入的情况了,state往上加1
2、线程执行到unlock()
public void unlock() { sync.release(1); }
unlock就直接是sync的release()了,实际是直接调用了AQS的release()释放线程,也是核心方法
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
如果tryReleas()成功,如果head不为空存在队列,并且h.waitStatus !=0,说明存在需要唤醒的节点时,执行unparkSuccessor()
如果h.waitStatus == 0呢,说明那个节点刚执行完addWaiter(),把自己加到队列尾部,还没来得及通过shouldParkAfterFailedAcquire()去把头结点的waitStatus置为node.signal = -1,他还没有阻塞呢,所以不需要唤醒
再看tryRelease()排它锁的唤醒实现
rotected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
如果是重入锁的情况,state减1,然后没有激活下一个线程
如果是正常情况,c == 0了,清空AQS当前线程,设置state = 0后返回true释放锁成功
IllegalMonitorStateException异常表示只有在当前线程解锁才可以,不能在当前线程加锁,在其他线程解锁
最后看一下具体的激活线程操作unparkSuccessor()
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
如果head节点的waitStatus == -1,表示当前有节点等待唤醒,就先把waitStatus设置为0,因为马上节点就被唤醒了,就不存在需要唤醒的节点了
如果出现head的后继节点已经是取消状态,就从队尾开始遍历查找最近的可唤醒节点,之所以从队尾是因为prev是可靠的,next有可能断连着
最后排队的节点被唤醒了
需要注意的是它唤醒以后回到了之前acquireQueued()方法中的for(;;)循环中
for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; }
醒来第一件事就是找能用的锁,没有的话继续被阻塞...