ReentrantReadWriteLock 内部维护了 ReadLock 跟 WriteLock类,state状态高16bit代表读锁状态 低16bit代表写锁状态,Sync内部封装了,WriteLock -> acquire(1) -> tryAcquire() ; ReadLock -> acquireShared -> tryAcqurieShare()方法。
同事也支持公平锁/非公平锁 分别控制读写锁不同的操作。如果熟悉AQS源码,这个类其实还是蛮简单的。
WriteLock: 关键方法
// writeLock: 尝试获取锁 protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); // 获取独占线程数 int w = exclusiveCount(c); if (c != 0) { // 存在读锁 或者写锁 // (Note: if c != 0 and w == 0 then shared count != 0) // 如果 写锁为空 读锁肯定不为空 此时不允许获取写锁 如果w!=0 有写锁获取到锁 此时需要等待 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 判断是否超过写锁最大数 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 设置状态+acquires (写锁是低16位 直接+acquires) setState(c + acquires); return true; } // 公平锁: writerShouldBlock() 如果队列有node 直接加入队列 没有直接获取锁 // 非公平锁: writerShouldBlock() 返回false 竞争锁失败加入队列 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
//writeLock: 尝试释放锁 protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
WriteLock:关键代码
//readLock 共享锁 尝试获取锁 protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 存在写锁 切不是重入锁 return -1 加入队列 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //没有写锁 获取读锁 int r = sharedCount(c); // 公平锁 判断是否队列中有前置节点 非公平锁 判断next节点是否是独占节点 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 成功获取到读锁 增加 c+SHARED_UNIT if (r == 0) { // 没有读锁 firstReader = current; firstReaderHoldCount = 1;// 计数 } else if (firstReader == current) { // 记录重入锁number firstReaderHoldCount++ firstReaderHoldCount++; } else {// 初始化count rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } //获取读锁失败,放到循环里重试 return fullTryAcquireShared(current); }
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 开始循环 for (;;) { int c = getState(); // 存在读锁 切 不是重入锁 写锁会降级成读锁 直接返回-1 加入队列 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { //公平锁: 判断是否有next节点 非公平锁 判断next节点是否是独占节点 // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // 是否是第一个读线程 // assert firstReaderHoldCount > 0; } else { // 更新 readHolds中 HoldCounter 计数 if (rh == null) { rh = cachedHoldCounter; // 获取线程计数器 if (rh == null || rh.tid != getThreadId(current)) { // 比较线程id rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 是否超过读最大限制 throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // state + 1<<16 if (sharedCount(c) == 0) { // 读锁为0 firstReader = current; // 设置第一个读 避免查找 readHolds firstReaderHoldCount = 1; // 设置计数 } else if (firstReader == current) { // 是第一个 增加计数 firstReaderHoldCount++; } else { // 更新 readHolds中 HoldCounter 计数 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 是否是第一个读锁 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) // 计数是否为1 是的话设置为空 gc firstReader = null; else firstReaderHoldCount--; // 减少计数 } else { // 判断是够计数为0 为0从 threadLocal中移除 否则 --rh.count; HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // 设置state int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
对于ReadLock 跟 WriteLock 支持的 tryLock() 分别调用 sync 中的 tryWriteLock() 跟 tryReadLock();
final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); // 存在读写锁 if (c != 0) { // 获取写锁 int w = exclusiveCount(c); // case 1:存在读锁 case 2: 存在写锁 切不是重入锁 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 竞争锁 if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true;
final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); // 存在写锁 且 不是重入锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false; // 获取获取读锁线程数量 int r = sharedCount(c); if (r == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 竞争锁 if (compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return true; } } }