ReentrantLock的功能是实现代码段的并发访问控制,也就是通常意义上所说的锁,java中实现锁有两种方式,一种是本文所提的ReentrantLock,另一种是synchronized。ReentrantLock相比synchronized 使用可以更灵活,这次就来看看ReentrantLock的内部实现。
我们首先看下ReentrantLock的锁是如何实现的
其实就一行代码 ,看起来很简单,那么这里的sync是什么呢?
这是ReentrantLock内部的一个抽象类,继承了AbstractQueuedSynchronizer(AQS),ReentrantLock所有的功能都和这个类有关
用过ReentrantLock的人都知道,ReentrantLock是分为公平锁和非公平锁,这在ReentrantLock内部是两种实现
公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁。
非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用lock方法的先后顺序无关。
首先看下公平锁的内部实现
公平锁 加锁:
调用到了AQS的acquire方法:
再看下获取锁的代码
原来这里子类重写了
1 protected final boolean tryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 //获取状态位 4 int c = getState(); 5 //0,锁还没被拿走 6 if (c == 0) { 7 //如果队列中没有其他线程 说明没有线程正在占有锁 8 if (!hasQueuedPredecessors() && 9 //修改状态为1 10 compareAndSetState(0, acquires)) { 11 //如果通过CAS操作将状态为更新成功则代表当前线程获取锁, 12 //因此,将当前线程设置到AQS的一个变量中,说明这个线程拿走了锁。 13 setExclusiveOwnerThread(current); 14 return true; 15 } 16 } 17 //锁被拿走了,由于ReentrantLock是可重入锁,所以判断下持有锁的是否是同一个线程 18 else if (current == getExclusiveOwnerThread()) { 19 //如果是的话累加在state字段上就可以了 20 int nextc = c + acquires; 21 if (nextc < 0) 22 throw new Error("Maximum lock count exceeded"); 23 setState(nextc); 24 return true; 25 } 26 return false; 27 } 28 } 29 30 31 protected final int getState() { 32 return state; 33 }
再回到acquire方法中
我们看下addWaiter方法
1 private Node addWaiter(Node mode) { 2 //用当前线程构造一个node,mode是一个表示Node类型的字段, 3 //仅仅表示这个节点是独占的,还是共享的 4 Node node = new Node(Thread.currentThread(), mode); 5 // Try the fast path of enq; backup to full enq on failure 6 Node pred = tail; 7 //将节点插入到尾部 8 if (pred != null) { 9 node.prev = pred; 10 if (compareAndSetTail(pred, node)) { 11 pred.next = node; 12 return node; 13 } 14 } 15 //节点插入尾部失败,进入enq的死循环,知道插入成功 16 enq(node); 17 return node; 18 }
将线程的节点添加队里中后,还需要做一件事:将当前线程挂起!这个事,由acquireQueued来做。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.predecessor(); 7 //如果当前的前一个节点是head说明他是队列中第一个“有效的”节点,因此尝试获取,这个方法子类重写了。 8 if (p == head && tryAcquire(arg)) { 9 setHead(node);//获取成功之后设为将该节点头结点 10 p.next = null; // help GC 将原先的头结点的联系去除 这样原先头结点就可以被GC 11 failed = false; 12 return interrupted; 13 } 14 ////否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。 15 if (shouldParkAfterFailedAcquire(p, node) && 16 //如果需要,借助JUC包下的LockSopport类的静态方法Park挂起当前线程。直到被唤醒。 17 parkAndCheckInterrupt()) 18 interrupted = true; 19 } 20 } finally { 21 if (failed) 22 cancelAcquire(node); 23 } 24 }
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); 3 return Thread.interrupted(); 4 }
这里需要在介绍下AQS中的Node节点的状态
黄色节点是默认head节点,默认是一个空节点,可以理解成代表当前持有锁的线程,每当有线程竞争失败,都是插入到队列的尾节点,tail节点始终指向队列中的最后一个元素。默认只有当前节点的pre节点是头结点才能去acquire
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus;
CANCELLED 取消状态
SIGNAL 等待触发状态
CONDITION 等待条件状态
PROPAGATE 状态需要向后传播
到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到AQS队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS的队列为FIFO队列,所以,每次被CPU假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的
非公平锁 加锁:
非公平锁相对于公平锁,只有加锁的环节是不同的
非公平锁首先先去尝试修改AQS状态,如果成功了,当前线程就持有了锁,失败了,再像公平锁一样,进入队列,静静的等待
锁释放:
还是一行代码,再看看sync的内部实现
代码很直观明了,我们再看下是如何唤醒头结点的
至此一个完整的流程就结束了。我们再来总结下整个流程
以公平锁为例
加锁:
1.尝试获取锁,判断state是否是0,是0的话CAS操作把state改为1,获取到锁 2.state不是0,判断下是否存在重入锁的情况 3.如果1和2都失败了,那就要把节点插入到链表中 4.再把这个线程挂起,挂起的时候还会尝试一次1和2的操作
释放锁:
1.释放锁
2.唤醒AQS节点