ReentrantReadWriteLock 源码解析,欢迎指正。
读写锁同样依赖AQS抽象队列同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,
而读写锁的自定义同步器需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整形变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁讲变量切割分成了两部分,高16位表示读,低16位表示写。
假设当前同步状态值为c ,写状态等于 c & 0x0000FFFF (将高16位全部抹去),读状态等于c >>>16 (无符号补0右移16位)。
当写状态增加1时,等于c+1 ,当读状态增加1时,等于 c + (1<<<16) ,也就是c + 0x00010000 .
根据状态的划分能得出一个推论:c不等于0时,当写状态 c & 0x0000FFFF 等于0 ,则读状态 c >>>16 大于0 ,即 读锁已被获取。此时,不能获取写锁。
主要分析一下几个方法
package com.study.authority; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * @Description: * @Auther: BacHe * @Date: 2019/9/17 10:59 */ public class myTest2 { public static void main(String[] args) { //新建读写锁 ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //写锁 ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); //读锁 ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); //获取写锁 writeLock.lock(); //尝试 获取写锁 writeLock.tryLock(); //获取读锁 readLock.lock(); //尝试 获取读锁 readLock.tryLock(); //释放写锁 writeLock.unlock(); //释放读锁 readLock.unlock(); } }
1.读写状态的设计。第252行。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; //把锁状态变量state按位切割使用。 //高16位表示读,低16位表示写。 static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //变量c 是当前锁的同步状态值。 //读状态等于c>>>16 ,无符号补0右边移16位 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //写状态等于c & 0x0000FFFF ,将高16位全部抹去 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
2.尝试获取写锁。第555行。
final boolean tryWriteLock() { //获取当前线程 Thread current = Thread.currentThread(); //获取当前状态state锁标志变量 int c = getState(); if (c != 0) { //获取写锁的次数。返回count中表示的独占保留的数量 int w = exclusiveCount(c); //写锁次数w为0,说明当前有读锁占用。 //当前线程不是获得写锁的线程。 if (w == 0 || current != getExclusiveOwnerThread()) return false; //写锁次数达到最大次数。因为只能使用16位,所以做了最大次数限制。 if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } //CAS修改当前锁标志,写锁状态+1 if (!compareAndSetState(c, c + 1)) return false; //设置当前线程,为得到写锁的线程。 setExclusiveOwnerThread(current); return true; }
3.尝试获取读锁。第576行。
final boolean tryReadLock() { //获取当前线程 Thread current = Thread.currentThread(); //死循环 for (;;) { //获取当前锁标志 int c = getState(); //写锁次数不为0 并且 写锁线程不是当前线程。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //返回尝试获取读锁失败 return false; //获取读锁被获取的次数 int r = sharedCount(c); //达到最大次数 65535 要抛异常 if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //CAS修改让读标志加1,也就是高16位加1,也就是c加2的16次方。 if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { //读锁没有被获取过,则设置当前线程为第一个得到读锁的线程 firstReader = current; //第一个得到读锁的线程,设置获取到读锁的次数为1 firstReaderHoldCount = 1; //如果当前线程是第一个得到读锁的线程 } else if (firstReader == current) { //第一个得到读锁的线程,设置获取到读锁的次数加1 firstReaderHoldCount++; } else { //缓存的获取读锁次数的计数器(第二个读线程过来,rh == null ,然后从readHolds.get()初始化。) HoldCounter rh = cachedHoldCounter; //如果计数器为空,或者 计数器的线程id不是当前线程的id (第三个线程过来,tid会和第二个线程不同。也会初始化一个计数器) if (rh == null || rh.tid != getThreadId(current)) //从ThreadLocal获取当前线程的持有计数器(继承ThreadLocal,重写initialValue方法,新建计数器new HoldCounter()返回)。计数器保存到 ThreadLocalMap cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //把持有计数器设置给本地线程 readHolds.set(rh); //*****持有计数器数量加1,(重点:rh 指针指向了HoldCounter对象的同一个内存地址,rh.count++操作会维护在当前线程的 ThreadLocalMap中) rh.count++; } //返回获取读锁成功 return true; } } }
4. 获取写锁 writeLock.lock();方法下的sync.acquire(1);方法下的tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) { /* 演练: 1.如果读取计数非零或写入计数非零且所有者是另一个线程,则失败。 2.如果计数饱和,则失败。 (只有在count已经不为零时,才可能发生这种情况。) 3.否则,如果该线程是可重入获取或队列策略允许的话,则有资格进行锁定。 如果是这样,请更新状态并设置所有者 */ Thread current = Thread.currentThread(); int c = getState(); //获取写锁次数 int w = exclusiveCount(c); if (c != 0) { //如果锁状态不为0,并且写锁次数等于0 ,这则读锁次数不为零。有读锁不能加写锁 //写锁次数不等于0,但是获得写锁的线程不是当前线程,也不能加写锁。 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; //最大加写锁次数不能大于 65535 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire 重入获取 //加写锁次数,更改状态,重入获取,因为当前线程就是获取到写锁的线程,所以不用CAS 操作 setState(c + acquires); return true; } //当前锁状态为0,还没有读锁和写锁 //写锁应该阻塞?根据公平锁和非公平锁,决定是否要判断当前排队队列 //CAS操作更新锁状态 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //CAS加锁成功,设置当前线程为获得锁的线程 setExclusiveOwnerThread(current); return true; }
下一步操作,加入等待队列。
public final void acquire(int arg) { //尝试获取锁,成功,然后加入队列 if (!tryAcquire(arg) && //addWaiter用当前线程创建Node加入等待队列 //acquireQueued 方法在后面分析 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取锁不成功,中断当前线程 selfInterrupt(); }
分析 acquireQueued(final Node node, int arg) 方法
//以排他的不间断模式获取已在队列中的线程。 用于条件等待方法以及获取。 // 如果在等待期间被打断,则返回{@code true} final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取当前节点的上一个节点 final Node p = node.predecessor(); //如果上一个节点是头节点,尝试加锁 if (p == head && tryAcquire(arg)) { //加锁成功,设置当前节点是头节点 setHead(node); //上一个节点的next置空,因为已经不需要标记了。当前节点已经得到了锁 p.next = null; // help GC //加锁失败标记的变量 failed = false; //返回中断标记 return interrupted; } //失败获取后应该停放, if (shouldParkAfterFailedAcquire(p, node) && //停顿并检查中断 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //取消获取 cancelAcquire(node); } }