一 UML类图
1.1、ReentrantLock
通过类图ReentrantLock是同步锁,同一时间只能有一个线程获取到锁,其他获取该锁的线程会被阻塞而被放入AQS阻塞队列中。ReentrantLock类继承Lock接口;内部抽象类Sync实现抽象队列同步器AbstractQueuedSynchronizer;Sync类有两个子类NonfairSync(非公平锁)和FairSync(公平锁),默认构造方法使用非公平锁,可以使用带布尔参数的构造方法指定使用公平锁;ReentrantLock可以创建多个条件进行绑定。
1.2、AbstractQueuedSynchronizer
AbstractQueuedSynchronizer:抽象队列同步器,维护一个volatile int state变量标识共享资源和一个FIFO线程阻塞队列(AQS队列)。
protected final void setState(int newState):设置state值
protected final int getState():获取state值
protected final boolean compareAndSetState(int expect, int update):CAS设置state值
AQS有两种共享资源类型:SHARED(共享)和EXCLUSIVE(独占),针对类型有不同的方法:
protected boolean tryAcquire(int arg):独占类型获取锁
protected boolean tryRelease(int arg):独占类型释放锁
protected int tryAcquireShared(int arg):共享类型获取锁
protected boolean tryReleaseShared(int arg):共享类型释放锁
protected boolean isHeldExclusively():是否是独占类型
1.3、线程节点类型waitStatus
AQS队列中节点的waitStatus枚举值(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)含义:
static final int CANCELLED = 1; //线程被取消
static final int SIGNAL = -1; //成功的线程需要被唤醒
static final int CONDITION = -2; //线程在条件队列中等待
static final int PROPAGATE = -3; //释放锁是需要通知其他节点
二 原理分析
2.1 获取锁
2.1.1 void lock()方法
调用线程T调用该方法尝试获取当前锁。
①如果锁未被其他线程获取,则调用线程T成功获取到当前锁,然后设置当前锁的拥有者为调用线程T,并设置AQS的状态值state为1,然后直接返回。
②如果调用线程T之前已经获取当前锁,则只设置设置AQS的状态值state加1,然后返回。
③如果当前锁已被其他线程获取,则调用线程T放入AQS队列后阻塞挂起。
public void lock() { sync.lock();//委托内部公平锁和非公平锁获取锁 }
//非公平锁
final void lock() { if (compareAndSetState(0, 1))//设置AQS状态值为1 setExclusiveOwnerThread(Thread.currentThread());//成功设置锁的线程拥有者 else acquire(1);//失败加入到AQS队列阻塞挂起 } //公平锁 final void lock() { acquire(1); }
非公平锁分析:
//调用父类AbstractOwnableSynchronizer方法CAS设置state值,成功返回true,失败返回false protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } //调用父类AbstractOwnableSynchronizer方法,设置当前线程为锁的拥有者 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
//调用AbstractQueuedSynchronizer父类方法,第一次获取锁失败 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//排它锁类型 selfInterrupt(); } //NonfairSync子类,重写父类方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
//Sync类非公平锁尝试获取锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//二次获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//当前线程已获取锁,AQS状态值加1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
//AbstractQueuedSynchronizer类创建节点,添加到AQS队列后面 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//创建AQS队列的节点,节点类型排它锁 Node pred = tail;//尾结点 if (pred != null) { node.prev = pred;//新节点的前一个节点是尾结点 if (compareAndSetTail(pred, node)) {//CAS机制添加节点 pred.next = node;//尾结点执行新的节点 return node; } } enq(node); return node; }
//插入节点到队列中
private Node enq(final Node node) { for (;;) {//循环的方式,直至创建成功 Node t = tail;//尾结点 if (t == null) { //尾结点为空,初始化 if (compareAndSetHead(new Node()))//第一步:CAS创建头结点(哨兵节点)一个空节点 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//第二步:CAS设置尾结点 t.next = node; return t; } } } }
// final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//前向节点 if (p == head && tryAcquire(arg)) {//如果p节点是头结点,node作为队列第二个节点 setHead(node);//将头节点设置为node节点,node节点出队列 p.next = null; //原头结点设置为空,便于GC回收 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node);//失败解锁 } }
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
//阻塞挂起当前线程
static void selfInterrupt() { Thread.currentThread().interrupt(); }
//
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//大于0代表线程被取消 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //线程阻塞,打断线程 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
公平锁分析:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {//与非公平锁相比,区别就在标红的位置 setExclusiveOwnerThread(current); return true; } }else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s;
//①h != t:表示AQS队列头结点和尾结点不相同,队列不为空;
//②(s = h.next) == null || s.thread != Thread.currentThread():头结点(哨兵节点)为空或者next节点不等于当前线程 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
2.1.2 void lockInterruptibly()方法
与2.2.1方法相似,不同之处在于:该方法对中断进行响应,其他线程调用当前线程中断方法,抛出InterruptedException。
2.1.3 boolean tryLock()方法
尝试获取锁。注意:该方法不会引起当前线程阻塞。
2.1.4 boolean tryLock(long timeout, TimeUnit unit)方法
与2.1.3方法相似,不同之处在于:设置了超时时间。
2.2 释放锁
尝试释放锁。
如果当前线程T已获取锁,则调用该方法更新AQS状态值减1。如果当前状态值为0,则释放锁;如果当前状态值部位0,则只是减1操作。
如果当前线程T未获取锁,则调用该方法是会抛出IllegalMonitorStateException异常。
2.2.1 void unlock()方法
public void unlock() { sync.release(1); } //调用AbstractQueuedSynchronizer方法 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒线程 return true; } return false; } //回调Sync类释放锁 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null);//设置锁的拥有线程为空 } setState(c); return free; } // private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus;//线程阻塞等待状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);//CAS设置状态 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev)//遍历AQS队列 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒线程 }
h != t