说明
我们都知道ArrayList不是线程安全的,在读的时候同时并发在写,在写的时候同时在读,会出现索引越界问题,解决这个问题醉简单的方式在写和读的地方都加上锁。都加上锁的话并发读也会产生互斥
但是为了性能 读的频繁写的时候比较少。我们都会允许读读共享,读写互斥,写写互斥,ReentrantReadWriteLock 就是读写锁 读读共享,读写互斥,写写互
为什么ArrayList线程不安全可以参考《ArrayList在多线程的线程不安全的几种体现》
ReentrantReadWriteLock 是基于AQS实现的 AQS源码阅读参考《JUC锁框架源码阅读-AbstractQueuedSynchronizer》
关于位运算可以参考:《java位运算符,&,|,>>>,>>,<<,<<<的区别》
初始化
main
//<1> ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
1.通过位运算符& 通过AQS state&EXCLUSIVE_MASK 进行运算,如果大于0则表示写锁以及被持有,则获取读锁(AQS共享锁)失败 返回-1加入CLH队列
2.如果小于0则表示没有写锁。读读共享(共享锁可以多线程持有)则尝试获取,通过state从高位右移可以获取到写锁数量。
3.先因为int是32位 16位是临界,则65536是最大支持个线程可获取读锁数量(一般够用了),需要校验是否超过65536如果超过了直接获取失败 加入CLH队列
4.没有超过如果限制没有任何线程获取读锁,则直接CAS尝试获取
5.获取失败则自旋获取,每次获取都需要判断是否写锁被持有了,自旋直到获取成功或者因为写锁被持有,或者超出最大可获取数量返回对应结果(注获取失败都是返回-1 AQS将阻塞线程并加入CLH队列)
6.获取成功 因为可重入将更新ThreadLocal的HoldCounter count+1
<1>
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { //可以sync是ReentrantReadWriteLock 的2个内部类。可以发现ReentrantReadWriteLock 也是支持公平和非公平的 sync = fair ? new ReentrantReadWriteLock.FairSync() : new ReentrantReadWriteLock.NonfairSync(); //读锁也是内部内 内部保存了ReentrantReadWriteLock的引用 意味着可以拿到sync readerLock = new ReentrantReadWriteLock.ReadLock(this); //写锁也是内部内 内部保存了ReentrantReadWriteLock的引用 意味着可以拿到sync writerLock = new ReentrantReadWriteLock.WriteLock(this); }
Sync
ReentrantReadWriteLock 的公平锁和非公平锁都继承了Sync
继续看类结构 他们只重写了部分方法 主要是为了实现公平和非公平锁,具体加锁都在父类Sync
/** * Nonfair version of Sync */ static final class NonfairSync extends ReentrantReadWriteLock.Sync { private static final long serialVersionUID = -8159625535654395037L; //重写了writerShouldBlock方法 final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } } /** * Fair version of Sync */ static final class FairSync extends ReentrantReadWriteLock.Sync { private static final long serialVersionUID = -2274990926593161451L; //重写了writerShouldBlock final boolean writerShouldBlock() { return hasQueuedPredecessors(); } //重写了readerShouldBlock final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
ReentrantReadWriteLock 是通过将AQS state int 32转2进制通过高16位表示读锁 低16位表示写锁
//因为int 是32位,通过16将他们一分为二 高位加读锁 低位加写锁 static final int SHARED_SHIFT = 16; //1转为二进制向左边移动16位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); //1转为二进制向左边移动16位 -1表示高的最小值 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //1转为二进制向左边移动16位 -1表示低的最大值 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; //将state向右移动16位 则可以得到持有锁的数量 我的理解 就是高位的移动到后面当成16进制处理 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** * 位运算 将&左右2边转为二进制 同时为1 则为1否则为0 得出写锁数量 * 00000000 00000001 00000000 00000000 高位读锁与EXCLUSIVE_MASK计算后 =0 * 00000000 00000001 00000000 00000001 低位写锁与EXCLUSIVE_MASK计算后 =1 * 00000000 00000000 11111111 11111111 EXCLUSIVE_MASK * @param c * @return */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
HoldCounter
//Sync内部类 锁是可重入的,保存每个线程持有数量 static final class HoldCounter { //计数器 int count = 0; //线程id // Use id, not reference, to avoid garbage retention final long tid = getThreadId(Thread.currentThread()); } /** *Sync内部类 重新了ThreadLocal的 initialValue方法,为ThreadLocal设置默认值 */ static final class ThreadLocalHoldCounter extends ThreadLocal<ReentrantReadWriteLock.Sync.HoldCounter> { public ReentrantReadWriteLock.Sync.HoldCounter initialValue() { return new ReentrantReadWriteLock.Sync.HoldCounter(); } } //sync成员变量 private transient ReentrantReadWriteLock.Sync.ThreadLocalHoldCounter readHolds;
读锁加锁
main
public static void main(String[] args) { //<L>初始化 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //获得读锁 内部return readLock Lock rlock = readWriteLock.readLock(); //<1>加锁 rlock.lock(); }
<1>lock
public void lock() { //<2>调用的继承父类的AQS方法 sync.acquireShared(1); }
<2>acquireShared
可以参考:《AQS源码阅读》
// 获取共享锁 public final void acquireShared(int arg) { //<3>模板模式 抽象方法 由子类实现 尝试去获取共享锁,如果返回值小于0表示获取共享锁失败 这里调用的是Snyc if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
<3>tryAcquireShared
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) { //获得当前线程 Thread current = Thread.currentThread(); //获得AQS状态 int c = getState(); /** * exclusiveCount判断是否已经持有了写锁 * getExclusiveOwnerThread 获得持有写锁的线程,如果不是当前线程持有直接返回-1 获取读锁失败 这里说明 同一个线程加了写锁是可以继续获取读锁的 */ if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //获取获取读锁的数量 int r = sharedCount(c); /** * readerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法 * FairSync(公平锁):直接如果队列不为空就返回false * NonfairSync(非公平锁)遍历CLH队列如果已经在队列中表示轮到他获取锁了返回true否则返回false 需要重新加入CLH队列 * r < MAX_COUNT是否超了 读锁的获取临界,因为state划分了高位和低位所以不能超过最大值 1<<16-1=65536 所以同时支持65536 获取这么多度锁。。理论上够用了 * compareAndSetState(c, c + SHARED_UNIT) */ if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { //没有持有任何锁的情况 if (r == 0) { //记录第一个获读锁的线程 firstReader = current; //第一个获取获取读锁线程+1 firstReaderHoldCount = 1; } else if (firstReader == current) { //如果当前线程是第一获取读锁的线程 计数器加1 firstReaderHoldCount++; } else { //获取最近一个获取成功获取读锁的计数器 这省却了ThreadLocal查找, ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter; //判断是否是当前线程的 如果不是则从ThreadLocal查找 并设置到最近一个获取 if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); //计数器+1 rh.count++; } return 1; } //<4>并发获取锁的情况 尝试获取读锁失败,自旋重试。 return fullTryAcquireShared(current); }
<4>fullTryAcquireShared
final int fullTryAcquireShared(Thread current) { ReentrantReadWriteLock.Sync.HoldCounter rh = null; //自旋 for (;;) { //获取AQS 锁状态 int c = getState(); /** * exclusiveCount判断是否已经持有了写锁 * getExclusiveOwnerThread 获得持有写锁的线程,如果不是当前线程持有直接返回-1 获取读锁失败 这里说明 同一个线程加了写锁是可以继续获取读锁的 */ if (exclusiveCount(c) != 0) { //如果写锁以及被获取 同时不是当前线程直接返回-1 不自旋 if (getExclusiveOwnerThread() != current) return -1; /** * * readerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法 * * FairSync(公平锁):直接如果队列不为空就返回false * * NonfairSync(非公平锁)遍历CLH队列如果已经在队列中表示轮到他获取锁了返回true否则返回false 需要重新加入CLH队列 */ } else if (readerShouldBlock()) { // 第一个获取读锁的是当前线程不做处理 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { //更新锁计数(可重入的体现) if (rh == null) { //获取最近一个获取读锁线程的Counter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { //如果不是最近的从ThreadLocal获取 rh = readHolds.get(); if (rh.count == 0) //如果当前线程的持有读锁数为0,那么就没必要使用计数器,直接移除 readHolds.remove(); } } if (rh.count == 0) //结束自旋 return -1; } } //超过最大可获取数量 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //自旋获取共享锁 获取成功Countter++ if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
读锁释放锁
1.每次释放通过HoldCounter 的count -1直到减为0则返回AQS父类释放成功,将AQS的state -1,并将计数器从ThreadLcal清除
2.AQS则会唤醒CLH阻塞队列重新尝试重新获取锁
main
public static void main(String[] args) { //<L>初始化 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //获得读锁 内部return readLock Lock rlock = readWriteLock.readLock(); rlock.lock(); //<1> rlock.unlock(); }
<1>unlock
public void unlock() { //<2>调用Sync继承AQC方法 sync.release(1); }
<2>releaseShared
可以参考《AQS源码》
// 释放共享锁 public final boolean releaseShared(int arg) { //<3>模板模式 抽象方法尝试释放共享锁 if (tryReleaseShared(arg)) { // 唤醒等待共享锁的线程 doReleaseShared(); return true; } return false; }
<3>tryReleaseShared
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //是不第一个获取锁 if (firstReader == current) { //可重入的特点 直接置空 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--;//计数器-1 } else { //判断是不是最近获取读锁的线程 ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get();//如果不是 从ThreadLocal获取 int count = rh.count; //可重入特点 判断数量 觉得 if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } //自旋释放释放锁 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
写锁加锁
main
//<L>初始化 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //获得读锁 内部return readLock Lock writeLock = readWriteLock.writeLock(); //<1>加锁 writeLock.lock();
1.如果AQS state不等于0,表示有可能获取的写锁或者读锁,根据位运算符&获取写锁数量,如果数量=0那么表示是读锁。直接获取失败(加入CLH队列阻塞)
2.如果获取写锁数量大于0 根据getExclusiveOwnerThread() 判断是否不是当前线程持有,如果是的话直接获取成功(可重入) state+1并通过CAS修改
<1>lock
public void lock() { //<2>调用继承自AQS加锁方法 sync.acquire(1); }
<2>acquire
可以参考《AQS源码阅读》
public final void acquire(int arg) { /** *<3>tryAcquire 模板模式 是抽象方法由子类实现获取锁步骤 *addWaiter 如果加锁失败 创建节点并放入队列尾 *<acquireQueued 通过新创建的节点 判断是重试获取锁。还是阻塞线程 */ if (!tryAcquire(arg) && acquireQueued(addWaiter(node.EXCLUSIVE), arg)) selfInterrupt(); }
<3>tryAcquire
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //获得AQS锁状态 int c = getState(); //获得写锁数量 int w = exclusiveCount(c); //>0表示写锁或者读锁被获取 if (c != 0) { //==0表示读锁被持有 直接返回false 加入CLH队列 如果是写锁判断是是否是当前线程持有(可重入) if (w == 0 || current != getExclusiveOwnerThread()) return false; //判断是否有超过可获取写锁数量 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); //则state++ 可重入 setState(c + acquires); return true; } /** * writerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法 * FairSync(公平锁): 不在队列返回true 在队列返回false * NonfairSync(非公平锁)直接返回false 尝试直接获取锁 * r < MAX_COUNT是否超了 读锁的获取临界,因为state划分了高位和低位所以不能超过最大值 1<<16-1=65536 所以同时支持65536 获取这么多度锁。。理论上够用了 * compareAndSetState(c, c + SHARED_UNIT) */ if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; //设置当前持有锁线程 setExclusiveOwnerThread(current); return true; }
写锁释放锁
main
public static void main(String[] args) { //<L>初始化 ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); //获得读锁 内部return readLock Lock writeLock = readWriteLock.writeLock(); //<1>释放锁 writeLock.unlock(); }
1.getExclusiveOwnerThread() == Thread.currentThread() 判断是否是当前线程持有锁 如不过不是 直接抛错
2.AQS sate-- 更新到AQS state 如果大于0返回释放失败 如果==0 清空exclusiveOwnerThread 返回释放成功 后续交给AQS处理 唤醒CLH等待线程重新获取锁
<1>unlock
public void unlock() { //<2>调用的继承AQS的共享锁释放方法 sync.release(1); }
<2>relase
可以参考 《AQS源码阅读》
// 在独占锁模式下,释放锁的操作 public final boolean release(int arg) { // <3>调用tryRelease子类方法,尝试去释放锁,由子类具体实现 if (tryRelease(arg)) { Node h = head; // <如果队列头节点的状态不是0,那么队列中就可能存在需要唤醒的等待节点。 // 还记得我们在acquireQueued(final Node node, int arg)获取锁的方法中,如果节点node没有获取到锁, // 那么我们会将节点node的前一个节点状态设置为Node.SIGNAL,然后调用parkAndCheckInterrupt方法 // 将节点node所在线程阻塞。 // 在这里就是通过unparkSuccessor方法,进而调用LockSupport.unpark(s.thread)方法,唤醒被阻塞的线程 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
<3>tryRelease
protected final boolean tryRelease(int releases) { //判断是不是当前线程持有getExclusiveOwnerThread() == Thread.currentThread(); 如果不是则抛异常 避免别的线程调用此方法释放了锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //获取状态-- int nextc = getState() - releases; //判断是否释放完了(可重入) 如果调用2次lock 则需要调用2次unlock才会真正释放锁 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null);//置空持有线程 //设置新的锁状态 这里不用CAS是 读锁 只会有有一个线程持有 setState(nextc); return free; }