实现的关键点:volatile,cas,AQS
lock()方法:
lock()方法由NonfairSync和FairSync2个类实现了非公平锁和公平锁;
static final class NonfairSync extends Sync { /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } } static final class FairSync extends Sync { final void lock() { acquire(1); } }
看下NonfairSync这个类里面的lock()方法
final void lock() { //调用cas尝试拿到锁 if (compareAndSetState(0, 1)) //设置当前的线程为持锁线程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
lock方法首先执行 compareAndSetState,而该方法实际上就是AQS中的一个方法,这个方法最终会调用 unsafe的一个CAS操作,线程安全的改变 state为1,独占锁。compareAndSetState,方法则是判断 AbstractQueuedSynchronizers中的 state值是否为0,如果为0,就修改为1,井返回true。 state'初始值为0,修改成功调用AQS父类AbstractOwnableSynchronizer的方法setexclusiveownerthread(Thread. currentThread)方法将当前独占锁线程设置为当前线程。线程抢锁成功。如果此时其他线程也调用了lock方法,执行 compareAndSetState方法失败,因为此时的 state.不为0,于是执行 acquire方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 获取state,尝试获取锁 if (c == 0) { //成功拿到锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果当前线程就是持锁的 else if (current == getExclusiveOwnerThread()) { // 重入 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
进入 acquire后执行tryAcquire方法,该方法是AQS中的一个抽象方法,具体实现由子类 NonfairSync实现,最终调用的是父类sync的nonfairTryAcquire方法。这段代码说明 Reentrantlock是重入锁。这个方法的意思就是在取尝试重试获取一次锁,因为有可能其他占有锁的线程很快就释放锁了,这里在次尝试了次,如果获取到了,就直接返回true。如果失败继续会判断当前独占锁的线程和当前线程是否为同一个线程(重入锁的实现),如果是,将 state设置为 state+1,并且反回rue,而这里他们不相等,当前独占锁的线程为线程A,当前线程为B所以结构会返回alse. !tryAcquire(arg)则为true,所以会继续执行同步器的addWaiter方法。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 尾节点不为空,把节点加到尾部 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //尾节点空 enq(node); return node; }
首先是创建一个 EXCLUSIVE独占模式的node节点,并且将当前线程保存在node中,如果尾节点不为空,追加当前节点为尾节点并返回当前节点。如果尾节点tail是null执行 enq(node),建立新的链表
private Node enq(final Node node) { for (;;) { Node t = tail; //尾节点空 建立新节点为头 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 设置当前为尾节点 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 前一节点是头节点,并且持有锁,加锁成功 if (p == head && tryAcquire(arg)) { //设置当前为头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //前一节点非头节点,继续等待,或者是头节点,但被其他线程占用锁 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
继续分析 acquireQueued方法的实现,同样又是个死循环,首先执行node.predecessor0方法返回传入的node节点的prev节点。如果前一节点是头节点,则继续执行tryAcquire方法,如果其他线程占据着锁,代码会执行shouldParkAfterFailedAcquire。如果头节点持有锁,设置当前新节点为头节点。 p. next=null标记GC。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
shouldParkAfterFailedAcquire方法有三种执行条件
1、若pred.waitStatus状态位大于0,说明这个节点已经取消了获取锁的操作, dowhile循环会递归删除掉这些放弃获取锁的节点。
2、若状态位不为 Node.SIGNAL,且没有取消操作,则会尝试将状态位修改为Node.SIGNAL。
3、状态位是Node.SIGNAL,表明线程是否巳经准备好被阻塞并等待唤醒。正常状况会执行compareAndSetWaitStatus方法,将head节点的 waitstatus设置为-1。shouldParkAfterFailedAcquire返回的 False,继续会循环执行到这个方法,而此时的 waitStatus=-1,所有直接返回true,继续执行parkAndCheckInterrupt方法。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
该方法阻塞当前线程并且下次被唤醒的时候返回是否被中断标识。等到被唤醒,会再次执行 acquireQueued中的循环,可能直接获得锁成功,或者再次被阻塞。
unlock()
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head;
// 释放锁成功,并且头部为空 status!=0 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {
//表示没有线程持有锁 free = true; setExclusiveOwnerThread(null); }
//可重入,更新state setState(c); return free; } private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
release方法,通过调用 Reentrantlocki类中的 tryRelease方法尝试释放锁。如果释放的线程跟独占线程不是同一个,抛异常;如果当前 state值减1等于0的话表示锁释放完成,把当前独占线程设置为nu井把 state值设置为0,返回true。 state可能不为0,因为这是重入锁,同一个线程可以lock多次,所以必须得释放多次才是可以完全释放锁。
释放锁成功,执行 unparkSuccessor方法,如果 waitstatus<0,会继续执行compareAndSetWaitStatus方法将waitStatus改为0。然后找到头节点的下一个节点,继续执行 LockSupport.unpark(s.thread)唤醒下节点之前被阻塞的线程