1 // 创建一个 ReentrantLock ,默认是“非公平锁”。 2 ReentrantLock() 3 // 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。 4 ReentrantLock(boolean fair) 5 // 查询当前线程保持此锁的次数。 6 int getHoldCount() 7 // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 8 protected Thread getOwner() 9 // 返回一个 collection,它包含可能正等待获取此锁的线程。 10 protected Collection<Thread> getQueuedThreads() 11 // 返回正等待获取此锁的线程估计数。 12 int getQueueLength() 13 // 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。 14 protected Collection<Thread> getWaitingThreads(Condition condition) 15 // 返回等待与此锁相关的给定条件的线程估计数。 16 int getWaitQueueLength(Condition condition) 17 // 查询给定线程是否正在等待获取此锁。 18 boolean hasQueuedThread(Thread thread) 19 // 查询是否有些线程正在等待获取此锁。 20 boolean hasQueuedThreads() 21 // 查询是否有些线程正在等待与此锁有关的给定条件。 22 boolean hasWaiters(Condition condition) 23 // 如果是“公平锁”返回true,否则返回false。 24 boolean isFair() 25 // 查询当前线程是否保持此锁。 26 boolean isHeldByCurrentThread() 27 // 查询此锁是否由任意线程保持。 28 boolean isLocked() 29 // 获取锁。 30 void lock() 31 // 如果当前线程未被中断,则获取锁。 32 void lockInterruptibly() 33 // 返回用来与此 Lock 实例一起使用的 Condition 实例。 34 Condition newCondition() 35 // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。 36 boolean tryLock() 37 // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。 38 boolean tryLock(long timeout, TimeUnit unit) 39 // 试图释放此锁。 40 void unlock()
二、基本概念
1. AQS -- 指AbstractQueuedSynchronizer类。
AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。
2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。
(01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。
(02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。
3. CLH队列
CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。
CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。
4. CAS函数 -- Compare And Swap
CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。
三、JUC-公平锁-获取锁
获取锁
public void lock() { //调用FairSync的lock方法 sync.lock(); } //继承Sync Sync继承AbstractQueuedSynchronizer类 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { //调用AbstractQueuedSynchronizer的acquire方法 acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
AQS 的acquire()实现
public final void acquire(int arg) { //tryAcquire尝试获取锁 //addWaiter(Node.EXCLUSIVE), arg)如果失败新增等待节点 //acquireQueued根据队列获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。
FairSync.tryAcquire()
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
protected final boolean tryAcquire(int acquires) {
// 获取“当前线程” final Thread current = Thread.currentThread(); // 获取“独占锁”的状态 int c = getState(); // c=0意味着“锁没有被任何线程锁拥有”, if (c == 0) { // 若“锁没有被任何线程锁拥有”, // 则判断“当前线程”是不是CLH队列中的第一个线程线程, // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果“独占锁”的拥有者已经为“当前线程”, // 则将更新锁的状态。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
tryAcquire()的作用就是尝试去获取锁,尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。
hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
hasQueuedPredecessors() 是通过判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。
Node
private transient volatile Node head; // CLH队列的队首 private transient volatile Node tail; // CLH队列的队尾 // CLH队列的节点 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 线程已被取消,对应的waitStatus的值 static final int CANCELLED = 1; // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。 // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。 static final int SIGNAL = -1; // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值 static final int CONDITION = -2; // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值 static final int PROPAGATE = -3;
// waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态, // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。 volatile int waitStatus; // 前一节点 volatile Node prev; // 后一节点 volatile Node next; // 节点所对应的线程 volatile Thread thread; // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记” // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列; // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。 Node nextWaiter; // “共享锁”则返回true,“独占锁”则返回false。 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
addWaiter()
private Node addWaiter(Node mode) { // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。 enq(node); return node; }
addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { // interrupted表示在CLH队列的调度中, // “当前线程”在休眠时,有没有被中断过。 boolean interrupted = false; for (;;) { // 获取上一个节点。 // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued()的目的是从队列中获取锁
shouldParkAfterFailedAcquire
// 返回“当前线程是否应该阻塞” private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前继节点的状态 int ws = pred.waitStatus; // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。 if (ws == Node.SIGNAL) return true; // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
selfInterrupt
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。
在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
四、JUC-公平锁-释放锁
unlock()
public void unlock() { sync.release(1); }
“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
release()
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。
protected final boolean tryRelease(int releases) { // c是本次释放锁之后的状态 int c = getState() - releases; // 如果“当前线程”不是“锁的持有者”,则抛出异常! if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 设置当前线程的锁的状态。 setState(c); return free; }
tryRelease()的作用是尝试释放锁。
(01) 如果“当前线程”不是“锁的持有者”,则抛出异常。
(02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。
unparkSuccessor()
private void unparkSuccessor(Node node) { // 获取当前线程的状态 int ws = node.waitStatus; // 如果状态<0,则设置状态=0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。 // 这里的有效,是指“后继节点对应的线程状态<=0” Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 唤醒“后继节点对应的线程” if (s != null) LockSupport.unpark(s.thread); }
在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。