锁机制学习笔记
目录:
JUC的并发包功能强大,但也不容易理解,大神果然是用来膜拜的。经过一段时间的研究和理解,我把自己所了解的关于JUC中锁的相关知识整理下来,一方面给自己做个备忘,另一方面也给各位朋友做个参考。
文中源码的关键部分都做了注释,希望对大家有所帮助。另外这只是学习笔记,建议大家先去了解一些基础知识再来看其中的源码,大家有疑问的可以再参考其他文章,谢谢!
CAS的意义
锁的一些基本原理
ReentrantLock的相关代码结构
两个重要的状态
I.AQS的state(int类型,32位)
II.Node的waitStatus
对队列中节点的操作(锁定线程或释放线程)则是基于节点的waitStatus的
CANCELLED = 1:
节点操作因为超时或者对应的线程被interrupt。节点不应该不留在此状态,一旦达到此状态将从CHL队列中踢出。
SIGNAL =
-1:
节点的继任节点是(或者将要成为)BLOCKED状态(例如通过LockSupport.park()操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。
CONDITION = -2:
表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
正常状态 =
0:
新生的非CONDITION节点都是此状态。
对于处在阻塞队列中的节点,当前节点之前的节点:
waitStatus >
0的是取消的节点,在处理中应该剔除
waitStatus = 0的,则需要将其改成-1
因此整个阻塞节点链的waitStatus应该为:-1,-1,-1,0
获取锁(AQS)的流程
锁的获取和释放都是基于上述2个状态来的,首先能不能获取锁是由AQS.state来控制,因此tryAcquire()和tryRelease()都是对state的控制,如果不能获取锁则需要加入到等待队列,此时线程的等待与释放则是由Node的waitStatus控制的。
下图演示了一个线程获取独占锁的过程:
I.获取锁总操作
1
2 3 4 |
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
2. addWaiter(Node.EXCLUSIVE):
3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg):
4. selfInterrupt():
II.tryAcquire(尝试获取锁)
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 0,代表当前锁无其他线程持有 if (isFirst(current) && compareAndSetState(0, acquires)) { // isFirst是公平锁和非公平锁在tryAcquire的唯一区别 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 非0,代表有线程持有锁,判断持有者是否为当前线程 // 这里修改为旧值+1呢?这是因为ReentrantLock是可重入锁,同一个线程每持有一次就+1 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } |
III.添加到等待队列
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 这一段是为提高性能而设的,没有也不影响功能 // 如果tail不为空,则设置新tail并返回 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); // 如果tail为空,则执行enq(),创建新head,插入队列,否则逻辑和上面一样 return node; } |
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
private Node enq(final Node node) {
for (;;) { Node t = tail; if (t == null) { // tail == null,创建新的节点加入到尾部 Node h = new Node(); // dummy header,傀儡节点 h.next = node; node.prev = h; if (compareAndSetHead(h)) { // CAS设置头部 tail = node; return h; } } else { // tail != null,则和addWaiter()中那段一样 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } |
IIII.自旋请求锁
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
final boolean acquireQueued(final Node node, int arg) {
try { boolean interrupted = false; // 循环操作 for (;;) { final Node p = node.predecessor(); // 如果第一个节点是Dummy Header也就是傀儡节点,那么第二个节点实际上就是头结点了,此时则尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } // acquire失败,判断是否应该park,并检查线程是否interrupt if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } } // CANCELLED = 1 // SIGNAL = -1 // CONDITION = -2 // NORMAL = 0 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 前一个节点的状态(注意:不是当前节点) int ws = pred.waitStatus; if (ws < 0) // waitStatus<0,也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。 return true; if (ws > 0) { // waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行4 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park() compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // ws<0才需要park(),ws>=0都返回false,表示线程不应该park() return false; } |
IIIII.释放锁
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
public final boolean release(int arg) {
// tryRelease 成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // unpark节点链的head后续节点的线程 unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { // state-1 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // state-1 == 0,则说明此时没有其他线程持有锁,release成功,unpark节点链的head节点的后续线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 设置新state setState(c); return free; } private void unparkSuccessor(Node node) { // 此时node是需要释放锁的头节点 // 清空头结点的waitStatus,也就是不需要锁了,这里修改成功失败无所谓 compareAndSetWaitStatus(node, ws, 0); // 从头结点的下一个节点开始寻找继任节点,当且仅当继任节点的waitStatus<=0才是有效继任节点,否则将这些waitStatus>0(也就是CANCELLED的节点)从AQS队列中剔除 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } |
原创文章,请注明引用来源:CM4J
参考文章:http://www.blogjava.net/xylz/archive/2010/07/05/325274.html