前言:
在Java的锁中很多锁都是同一时刻只允许一个线程访问,今天就来看看一个特殊的锁——读写锁。它的特殊之处就在于同一时刻可以运行多个读线程访问或者有一个写线程在访问。能够大大的提高并发性和吞吐量
ReentrantReadWriteLock介绍
读写锁是一种特殊的自旋锁。在读写锁的世界里,访问共享资源的线程被划分为两类:一类是只对共享资源进行访问而不更改,暂且认为他是读者;一类是改变共享资源,即写操作,是写者。这个锁的核心要求是在没有任何线程获得读锁和写锁的情况才能获得写锁。
ReentrantReadWriteLock特性
公平性:读写锁支持非公平和公平的锁获取方式,非公平锁的吞吐量优于公平锁的吞吐量,默认构造的是非公平锁
可重入:在线程获取读锁之后能够再次获取读锁,但是不能获取写锁,而线程在获取写锁之后能够再次获取写锁,同时也能获取读锁
锁降级:线程获取写锁之后获取读锁,再释放写锁,这样实现了写锁变为读锁,也叫锁降级
ReentrantReadWriteLock的内部类Sync核心源码解析
1、类的属性
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 // 版本序列号 3 private static final long serialVersionUID = 6317671515068378041L; 4 // 通过将整形变量state切成两部分,高16位表示读,低16位表示写,来区分读写状态。 5 static final int SHARED_SHIFT = 16; 6 7 static final int SHARED_UNIT = (1 << SHARED_SHIFT); 8 9 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; 10 11 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 12 // 本地线程计数器 13 private transient ThreadLocalHoldCounter readHolds; 14 // 缓存的计数器 15 private transient HoldCounter cachedHoldCounter; 16 // 第一个读线程 17 private transient Thread firstReader = null; 18 // 第一个读线程的计数 19 private transient int firstReaderHoldCount; 20 }
在ReentrantReadWriteLock中,读写状态的区分是其同步容器Sync的同步状态。而在Sync中应该有读线程的两种和写线程的两种状态。但是通常整形变量只能表示两种状态,这里需要四种所以就将整形变量分成高16位和低16位
上图的同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。这个状态如何快速获取呢?
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
上面这个方法能快速获得读状态,通过当前状态的无符号补0右移,能够将高16的状态移到低16位从而得到正常的数值状态。
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
上面这个方法则是快速获取写状态,直接将高16位抹去(相当于c&0x0000FFFF)
2、写锁的释放和获取
1 protected final boolean tryRelease(int releases) { 2 if (!isHeldExclusively()) 3 throw new IllegalMonitorStateException(); 4 int nextc = getState() - releases; 5 boolean free = exclusiveCount(nextc) == 0; 6 if (free) 7 setExclusiveOwnerThread(null); 8 setState(nextc); 9 return free; 10 }
tryRelease:先判断是否拿到的是独占的资源,不是就表示可能线程拿的是读锁或者没有锁,这时候直接抛异常。如果是写锁那么就判断释放的锁的个数是否是当前锁的数量,是就释放所有写锁并且设置状态,否则就只设置状态。
1 protected final boolean tryAcquire(int acquires) { 2 /* 3 * Walkthrough: 4 * 1. If read count nonzero or write count nonzero 5 * and owner is a different thread, fail. 6 * 2. If count would saturate, fail. (This can only 7 * happen if count is already nonzero.) 8 * 3. Otherwise, this thread is eligible for lock if 9 * it is either a reentrant acquire or 10 * queue policy allows it. If so, update state 11 * and set owner. 12 */ 13 Thread current = Thread.currentThread(); 14 int c = getState(); 15 int w = exclusiveCount(c); 16 if (c != 0) { 17 // (Note: if c != 0 and w == 0 then shared count != 0) 18 if (w == 0 || current != getExclusiveOwnerThread()) 19 return false; 20 if (w + exclusiveCount(acquires) > MAX_COUNT) 21 throw new Error("Maximum lock count exceeded"); 22 // Reentrant acquire 23 setState(c + acquires); 24 return true; 25 } 26 if (writerShouldBlock() || 27 !compareAndSetState(c, c + acquires)) 28 return false; 29 setExclusiveOwnerThread(current); 30 return true; 31 }
tryAcquire:首先这个方法会先获取state和线程的写锁个数,如果当前线程拿到了锁那么就判断拿的是否是写锁,如果ReentrantReadWriteLock存在读锁或者当先线程不是获取写锁的线程(即不是获取独占资源的线程),那么直接返回false,否则,根据可重入原则是可以获取锁的。或者另外一种情况就是当前线程没有拿到锁,然后在公平和非公平的情况下判断当前线程是否该被阻塞(公平情况下是看队列头部的直接后继,非公平就竞争),如果线程不该被阻塞就可以获取到锁。这时候获取的是写锁。
3、读锁的释放和获取
1 protected final boolean tryReleaseShared(int unused) { 2 Thread current = Thread.currentThread(); 3 if (firstReader == current) { 4 // assert firstReaderHoldCount > 0; 5 if (firstReaderHoldCount == 1) 6 firstReader = null; 7 else 8 firstReaderHoldCount--; 9 } else { 10 HoldCounter rh = cachedHoldCounter; 11 if (rh == null || rh.tid != getThreadId(current)) 12 rh = readHolds.get(); 13 int count = rh.count; 14 if (count <= 1) { 15 readHolds.remove(); 16 if (count <= 0) 17 throw unmatchedUnlockException(); 18 } 19 --rh.count; 20 } 21 for (;;) { 22 int c = getState(); 23 int nextc = c - SHARED_UNIT; 24 if (compareAndSetState(c, nextc)) 25 // Releasing the read lock has no effect on readers, 26 // but it may allow waiting writers to proceed if 27 // both read and write locks are now free. 28 return nextc == 0; 29 } 30 }
tryReleaseShared:主要是表示读锁线程释放锁。如果当前线程是第一个读线程且占有资源数firstReaderHoldCount为1,则可以直接释放该读线程,否则就firstReaderHoldCount减1。如果当前线程不是第一个读线程就会拿缓存计数器,用缓存计数器判断当前线程是否被缓存了或者是否是当前线程。如果计数器的值在缓存中,并且是当前的线程,那么就获取计数器中的计数count,否则还需要获取当前线程的计数器。获取完计数count之后计数值小于等于1就直接移除当前线程的计数器,而后count小于等于0就抛异常,否则就减少计数即可。之后就会进入无限循环,循环能够确保成功设置state。上述函数就是释放读锁,只不过对于当前线程是否是第一个读线程分两种情况来释放,最终能够获得state。
1 protected final int tryAcquireShared(int unused) { 2 /* 3 * Walkthrough: 4 * 1. If write lock held by another thread, fail. 5 * 2. Otherwise, this thread is eligible for 6 * lock wrt state, so ask if it should block 7 * because of queue policy. If not, try 8 * to grant by CASing state and updating count. 9 * Note that step does not check for reentrant 10 * acquires, which is postponed to full version 11 * to avoid having to check hold count in 12 * the more typical non-reentrant case. 13 * 3. If step 2 fails either because thread 14 * apparently not eligible or CAS fails or count 15 * saturated, chain to version with full retry loop. 16 */ 17 Thread current = Thread.currentThread(); 18 int c = getState(); 19 if (exclusiveCount(c) != 0 && 20 getExclusiveOwnerThread() != current) 21 return -1; 22 int r = sharedCount(c); 23 if (!readerShouldBlock() && 24 r < MAX_COUNT && 25 compareAndSetState(c, c + SHARED_UNIT)) { 26 if (r == 0) { 27 firstReader = current; 28 firstReaderHoldCount = 1; 29 } else if (firstReader == current) { 30 firstReaderHoldCount++; 31 } else { 32 HoldCounter rh = cachedHoldCounter; 33 if (rh == null || rh.tid != getThreadId(current)) 34 cachedHoldCounter = rh = readHolds.get(); 35 else if (rh.count == 0) 36 readHolds.set(rh); 37 rh.count++; 38 } 39 return 1; 40 } 41 return fullTryAcquireShared(current); 42 }
tryAcquireShared:获取读锁。首先就是当前线程为写线程或是独占锁的线程那么就会直接返回-1,当前线程不是写线程也不是独占的线程就获取ReentrantReadWriteLock的读锁数量判断读线程不应该被阻塞(分公平和不公平)并且小于读锁数量的最大值,并且设置状态成功。如果当前没有线程获取读锁,则这个线程就是第一个读线程,并且设置第一个读线程firstReader和firstReaderHoldCount;。如果当前线程为第一个读线程,只需要增加firstReaderHoldCount。上述两种情况不满足就设置当前线程对应的计数器HoldCounter。
1 final int fullTryAcquireShared(Thread current) { 2 /* 3 * This code is in part redundant with that in 4 * tryAcquireShared but is simpler overall by not 5 * complicating tryAcquireShared with interactions between 6 * retries and lazily reading hold counts. 7 */ 8 HoldCounter rh = null; 9 for (;;) { 10 int c = getState(); 11 if (exclusiveCount(c) != 0) { 12 if (getExclusiveOwnerThread() != current) 13 return -1; 14 // else we hold the exclusive lock; blocking here 15 // would cause deadlock. 16 } else if (readerShouldBlock()) { 17 // Make sure we're not acquiring read lock reentrantly 18 if (firstReader == current) { 19 // assert firstReaderHoldCount > 0; 20 } else { 21 if (rh == null) { 22 rh = cachedHoldCounter; 23 if (rh == null || rh.tid != getThreadId(current)) { 24 rh = readHolds.get(); 25 if (rh.count == 0) 26 readHolds.remove(); 27 } 28 } 29 if (rh.count == 0) 30 return -1; 31 } 32 } 33 if (sharedCount(c) == MAX_COUNT) 34 throw new Error("Maximum lock count exceeded"); 35 if (compareAndSetState(c, c + SHARED_UNIT)) { 36 if (sharedCount(c) == 0) { 37 firstReader = current; 38 firstReaderHoldCount = 1; 39 } else if (firstReader == current) { 40 firstReaderHoldCount++; 41 } else { 42 if (rh == null) 43 rh = cachedHoldCounter; 44 if (rh == null || rh.tid != getThreadId(current)) 45 rh = readHolds.get(); 46 else if (rh.count == 0) 47 readHolds.set(rh); 48 rh.count++; 49 cachedHoldCounter = rh; // cache for release 50 } 51 return 1; 52 } 53 } 54 }
fullTryAcquireShared:如果读线程被阻塞或者大于最大值,或者比较设置没成功。那么就会无限循环tryAcquireShared使得线程一定能获取读锁,如果是大于最大值则会抛异常。过程与tryAcquireShared类似。
4、直接竞争锁
1 final boolean tryWriteLock() { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c != 0) { 5 int w = exclusiveCount(c); 6 if (w == 0 || current != getExclusiveOwnerThread()) 7 return false; 8 if (w == MAX_COUNT) 9 throw new Error("Maximum lock count exceeded"); 10 } 11 if (!compareAndSetState(c, c + 1)) 12 return false; 13 setExclusiveOwnerThread(current); 14 return true; 15 }
tryWriteLock:实现机制与tryAcquire类似,但是区别在于不会去检查是否该线程获取写锁是否会被阻塞,直接让线程尝试竞争锁。
1 final boolean tryReadLock() { 2 Thread current = Thread.currentThread(); 3 for (;;) { 4 int c = getState(); 5 if (exclusiveCount(c) != 0 && 6 getExclusiveOwnerThread() != current) 7 return false; 8 int r = sharedCount(c); 9 if (r == MAX_COUNT) 10 throw new Error("Maximum lock count exceeded"); 11 if (compareAndSetState(c, c + SHARED_UNIT)) { 12 if (r == 0) { 13 firstReader = current; 14 firstReaderHoldCount = 1; 15 } else if (firstReader == current) { 16 firstReaderHoldCount++; 17 } else { 18 HoldCounter rh = cachedHoldCounter; 19 if (rh == null || rh.tid != getThreadId(current)) 20 cachedHoldCounter = rh = readHolds.get(); 21 else if (rh.count == 0) 22 readHolds.set(rh); 23 rh.count++; 24 } 25 return true; 26 } 27 } 28 }
tryReadLock:与实现机制tryAcquireShared,同样是直接让线程去竞争读锁
总结
再提醒一下就是读写锁的核心要求是在没有任何线程获得读锁和写锁的情况才能获得写锁。可以实现锁的降级(当前线程拥有写锁且还没释放的情况拿到读锁,再释放写锁,实现了写锁到读锁的转换),但无法实现锁的升级,同样支持可重入和公平锁与非公平锁。相比于ReentrantLock提高了线程访问资源的吞吐量。