1.读写锁原理
2.利用读写锁写一个安全的HashMap
读写锁原理
ReadWriteLock:维护一对关联锁,一个读锁一个写锁,读锁可以由多个线程同时获得,写锁只能被一个线程获得。同一时间,读锁和写锁不能被不同线程同时获得。
1.th1想获取写锁,检查readCount==0,满足,说明读锁未被占用,此时可抢写锁;检查writeCount==0说明写锁未被占用,采用CAS修改writeCount为1,若修改成功再将owner改为th1的引用
2.某线程想获取读锁,先检查writeCount是否为0,此时不为0,则不能去获取读锁,直接进入waiters,接下来的两个获取读锁的操作类似,都进了waiters
3.th2想获取写锁,检查readCount==0,满足,说明读锁未被占用,此时可抢写锁;检查writeCount==1说明写锁以被占用,进而查看owner是不是自己,结果发现不是自己,进waiters
4.th1又想获取写锁,检查readCount==0,满足,说明读锁未被占用,此时可抢写锁;检查writeCount==1说明写锁以被占用,进而查看owner是不是自己,结果发现是自己,修改writeCount为2
5.此时假如th1开始释放握有的2个写锁,释放时先判断owner是不是自己,是,将writeCount变为1,继续释放第二个锁writeCount变为0,owner变为null,这是waiters中的第一个线程会被唤醒,唤醒后开始抢读锁,首先判断writeCount==0,满足,然后将读锁给它,readCount加1,然后它会继续判断waiters里的头部是不是还是获取读锁的线程,若是则继续出队列获取读锁,直到队列头部不是获取读锁的线程
6.假如这3个获取到读锁的线程开始逐一释放读锁,readCount一旦等于0,则会唤醒队列头部想获取写锁的线程去尝试获取写锁
锁降级
ReadWriteLock中的读锁和写锁,如果某线程已经拿到写锁,在释放写锁之前它可以再次拿到读锁,等写锁释放后,该线程将继续占有读锁。
通俗一点就是说:既然你已经拿到写锁了,而且当前只有你自己在写,也没有其他线程在读,那你自己读也是可以的;但是如果你拿到读锁了,你想去写,对不起不可以,因为拿到读锁的不止你一个,其他人也在读,所以你不能写。
HashMap、HashTable、ConcurrentHashMap
HashMap线程不安全,HashTable利用synchronized保证线程安全,但效率太低,不能多并发,ConcurrentHashMap使用读写锁保证读的高并发和写的单并发,同一时间可以有多条线程读,提高了读效率
手写ConcurrentHashMap
package com.study.lock; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; // 将hashmap 改造一个并发安全的 // 这是ReentrantReadWriteLock注释中给出的一个示例 public class Demo7_Map { private final Map<String, Object> m = new HashMap<>(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock r = rwl.readLock(); private final Lock w = rwl.writeLock(); public Object get(String key){ r.lock(); try { return m.get(key); }finally { r.unlock(); } } public Object allKeys(){ r.lock(); try { return m.keySet().toArray(); }finally { r.unlock(); } } public Object put(String key, Object value){ w.lock(); try { return m.put(key, value); }finally { w.unlock(); } } public void clear(){ w.lock(); try { m.clear(); }finally { w.unlock(); } } }
利用 读写锁+数据库+REDIS 解决高并发场景下数据读写安全问题
import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; /* // 缓存示例 这是ReentrantReadWriteLock注释中给出的一个示例 用于构建一个缓存,该缓存在读取并使用值的时候,不允许修改缓存值 目前还没找到适用场景,有同学有适用场景的,可以推荐给老师 */ public class Demo8_CacheData { public static void main(String args[]) { System.out.println(TeacherInfoCache.get("Kody")); } } class TeacherInfoCache { static volatile boolean cacheValid; static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Object get(String dataKey) { Object data = null; // 读数据,加读锁 rwl.readLock().lock(); try { if (cacheValid) { data = Redis.data.get(dataKey); } else { // 缓存取不到,从数据库获取,但如果多条线程瞬间多次查询数据库,数据库可能宕机,利用锁解决 // data= DataBase.queryUserInfo(); rwl.readLock().unlock(); // 加写锁之后,并不会马上获取到所,会等到所有的读锁释放 rwl.writeLock().lock(); try { if (!cacheValid) { data = DataBase.queryUserInfo(); Redis.data.put(dataKey, data); cacheValid = true; } // 此处加读锁是为了与最后的finally中释放读锁组成一对 // 在释放写锁之前获取读锁,等写锁释放后,该线程仍占有读锁,不用再去抢读锁 rwl.readLock().lock();//此处发生锁降级,在释放写锁之前再次拿到读锁 } finally { rwl.writeLock().unlock(); } } return data; } finally { rwl.readLock().unlock(); } } } class DataBase { static String queryUserInfo() { System.out.println("查询数据库。。。"); return "name:Kody,age:40,gender:true,"; } } class Redis { static Map<String, Object> data = new HashMap<>(); }
手写ReadWriteLock
package com.study.lock.locks1; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class JamesReadWriteLock { private AtomicInteger readCount = new AtomicInteger(0); private AtomicInteger writeCount = new AtomicInteger(0); // 独占锁 拥有者 private AtomicReference<Thread> owner = new AtomicReference<>(); // 等待队列 public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>(); class WaitNode { int type = 0; // 0 为想获取独占锁的线程, 1为想获取共享锁的线程 Thread thread = null; int arg = 0; public WaitNode(Thread thread, int type, int arg) { this.thread = thread; this.type = type; this.arg = arg; } } // 获取独占锁 public void lock() { int arg = 1; // 尝试获取独占锁,若成功,退出方法, 若失败... if (!tryLock(arg)) { // 标记为独占锁 WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); waiters.offer(waitNode); // 进入等待队列 // 循环尝试拿锁 for (;;) { // 若队列头部是当前线程 WaitNode head = waiters.peek(); if (head != null && head.thread == Thread.currentThread()) { if (!tryLock(arg)) { // 再次尝试获取 独占锁 LockSupport.park(); // 若失败,挂起线程 } else { // 若成功获取 waiters.poll(); // 将当前线程从队列头部移除 return; // 并退出方法 } } else { // 若不是队列头部元素 LockSupport.park(); // 将当前线程挂起 } } } } // 释放独占锁 public boolean unlock() { int arg = 1; // 尝试释放独占锁 若失败返回true,若失败... if (tryUnlock(arg)) { WaitNode next = waiters.peek(); // 取出队列头部的元素 if (next != null) { Thread th = next.thread; LockSupport.unpark(th); // 唤醒队列头部的线程 } return true; // 返回true } return false; } // 尝试获取独占锁 public boolean tryLock(int acquires) { // 如果read count !=0 返回false if (readCount.get() != 0) return false; int wct = writeCount.get(); // 拿到 独占锁 当前状态 if (wct == 0) { if (writeCount.compareAndSet(wct, wct + acquires)) { // 通过修改state来抢锁 owner.set(Thread.currentThread()); // 抢到锁后,直接修改owner为当前线程 return true; } } else if (owner.get() == Thread.currentThread()) { writeCount.set(wct + acquires); // 修改count值 return true; } return false; } // 尝试释放独占锁 public boolean tryUnlock(int releases) { // 若当前线程没有 持有独占锁 if (owner.get() != Thread.currentThread()) { throw new IllegalMonitorStateException(); // 抛IllegalMonitorStateException } int wc = writeCount.get(); int nextc = wc - releases; // 计算 独占锁剩余占用 writeCount.set(nextc); // 不管是否完全释放,都更新count值 if (nextc == 0) { // 是否完全释放 owner.compareAndSet(Thread.currentThread(), null); return true; } else { return false; } } // 获取共享锁 public void lockShared() { int arg = 1; if (tryLockShared(arg) < 0) { // 如果tryAcquireShare失败 // 将当前进程放入队列 WaitNode node = new WaitNode(Thread.currentThread(), 1, arg); waiters.offer(node); // 加入队列 for (;;) { // 若队列头部的元素是当前线程 WaitNode head = waiters.peek(); if (head != null && head.thread == Thread.currentThread()) { if (tryLockShared(arg) >= 0) { // 尝试获取共享锁, 若成功 waiters.poll(); // 将当前线程从队列中移除 WaitNode next = waiters.peek(); if (next != null && next.type == 1) { // 如果下一个线程也是等待共享锁 LockSupport.unpark(next.thread); // 将其唤醒 } return; // 退出方法 } else { // 若尝试失败 LockSupport.park(); // 挂起线程 } } else { // 若不是头部元素 LockSupport.park(); } } } } // 解锁共享锁 public boolean unLockShared() { int arg = 1; if (tryUnLockShared(arg)) { // 当read count变为0,才叫release share成功 WaitNode next = waiters.peek(); if (next != null) { LockSupport.unpark(next.thread); } return true; } return false; } // 尝试获取共享锁 public int tryLockShared(int acquires) { for (;;) { if (writeCount.get() != 0 && owner.get() != Thread.currentThread()) return -1; int rct = readCount.get(); if (readCount.compareAndSet(rct, rct + acquires)) { return 1; } } } // 尝试解锁共享锁 public boolean tryUnLockShared(int releases) { for (;;) { int rc = readCount.get(); int nextc = rc - releases; if (readCount.compareAndSet(rc, nextc)) { return nextc == 0; } } } }
模板方法模式:提取 ReentrantLock 和 ReadWriteLock公共部分=>AQS
用上面手写的ReadWriteLock替换上一节手写的ReentrantLock中的相同方法后,ReentrantLock可以继续使用,
因此可将上面ReadWriteLock的代码作为一个公共类JamsAQS来使用,
为了实现公平锁(在等待队列头部才进行抢锁,而不是上面写的先抢锁->如果没抢到->放到等待队列,这段逻辑都是在try方法中写的),JamsAQS中的tryxx方法均不实现,放到锁的匿名内部类中实现:
匿名内部类重写方法:https://blog.csdn.net/shenhaiyushitiaoyu/article/details/84142618
package com.study.lock.locks5; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; public class JamesAQS { AtomicInteger readCount = new AtomicInteger(0); AtomicInteger writeCount = new AtomicInteger(0); // 独占锁 拥有者 AtomicReference<Thread> owner = new AtomicReference<>(); // 等待队列 public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>(); class WaitNode { int type = 0; // 0 为想获取独占锁的线程, 1为想获取共享锁的线程 Thread thread = null; int arg = 0; public WaitNode(Thread thread, int type, int arg) { this.thread = thread; this.type = type; this.arg = arg; } } // 获取独占锁 public void lock() { int arg = 1; // 尝试获取独占锁,若成功,退出方法, 若失败... if (!tryLock(arg)) { // 标记为独占锁 WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); waiters.offer(waitNode); // 进入等待队列 // 循环尝试拿锁 for (;;) { // 若队列头部是当前线程 WaitNode head = waiters.peek(); if (head != null && head.thread == Thread.currentThread()) { if (!tryLock(arg)) { // 再次尝试获取 独占锁 LockSupport.park(); // 若失败,挂起线程 } else { // 若成功获取 waiters.poll(); // 将当前线程从队列头部移除 return; // 并退出方法 } } else { // 若不是队列头部元素 LockSupport.park(); // 将当前线程挂起 } } } } // 释放独占锁 public boolean unlock() { int arg = 1; // 尝试释放独占锁 若失败返回true,若失败... if (tryUnlock(arg)) { WaitNode next = waiters.peek(); // 取出队列头部的元素 if (next != null) { Thread th = next.thread; LockSupport.unpark(th); // 唤醒队列头部的线程 } return true; // 返回true } return false; } // 获取共享锁 public void lockShared() { int arg = 1; if (tryLockShared(arg) < 0) { // 如果tryAcquireShare失败 // 将当前进程放入队列 WaitNode node = new WaitNode(Thread.currentThread(), 1, arg); waiters.offer(node); // 加入队列 for (;;) { // 若队列头部的元素是当前线程 WaitNode head = waiters.peek(); if (head != null && head.thread == Thread.currentThread()) { if (tryLockShared(arg) >= 0) { // 尝试获取共享锁, 若成功 waiters.poll(); // 将当前线程从队列中移除 WaitNode next = waiters.peek(); if (next != null && next.type == 1) { // 如果下一个线程也是等待共享锁 LockSupport.unpark(next.thread); // 将其唤醒 } return; // 退出方法 } else { // 若尝试失败 LockSupport.park(); // 挂起线程 } } else { // 若不是头部元素 LockSupport.park(); } } } } // 解锁共享锁 public boolean unLockShared() { int arg = 1; if (tryUnLockShared(arg)) { // 当read count变为0,才叫release share成功 WaitNode next = waiters.peek(); if (next != null) { LockSupport.unpark(next.thread); } return true; } return false; } // 尝试获取独占锁 public boolean tryLock(int acquires) { throw new UnsupportedOperationException(); } // 尝试释放独占锁 public boolean tryUnlock(int releases) { throw new UnsupportedOperationException(); } // 尝试获取共享锁 public int tryLockShared(int acquires) { throw new UnsupportedOperationException(); } // 尝试解锁共享锁 public boolean tryUnLockShared(int releases) { throw new UnsupportedOperationException(); } }
package com.study.lock.locks5; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; public class JamesReadWriteLock implements ReadWriteLock { JamesAQS mask = new JamesAQS(){ //尝试获取独占锁 public boolean tryLock(int acquires) { //如果read count !=0 返回false if (readCount.get() !=0) return false; int wct = writeCount.get(); //拿到 独占锁 当前状态 if (wct==0){ if (writeCount.compareAndSet(wct, wct + acquires)){ //通过修改state来抢锁 owner.set(Thread.currentThread()); // 抢到锁后,直接修改owner为当前线程 return true; } }else if (owner.get() == Thread.currentThread()){ writeCount.set(wct + acquires); //修改count值 return true; } return false; } //尝试释放独占锁 public boolean tryUnlock(int releases) { //若当前线程没有 持有独占锁 if(owner.get()!= Thread.currentThread()){ throw new IllegalMonitorStateException(); //抛IllegalMonitorStateException } int wc= writeCount.get(); int nextc = wc - releases; //计算 独占锁剩余占用 writeCount.set(nextc); //不管是否完全释放,都更新count值 if (nextc==0){ //是否完全释放 owner.compareAndSet(Thread.currentThread(), null); return true; }else{ return false; } } //尝试获取共享锁 public int tryLockShared(int acquires) { for (;;){ if (writeCount.get()!=0 && owner.get() != Thread.currentThread()) return -1; int rct = readCount.get(); if (readCount.compareAndSet(rct, rct + acquires)){ return 1; } } } //尝试解锁共享锁 public boolean tryUnLockShared(int releases) { for(;;){ int rc = readCount.get(); int nextc = rc - releases; if (readCount.compareAndSet(rc, nextc)){ return nextc==0; } } } }; @Override public Lock readLock() { return new Lock() { @Override public void lock() { mask.lockShared(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return mask.tryLockShared(1) == 1; } @Override public void unlock() { mask.unLockShared(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }; } @Override public Lock writeLock() { return new Lock() { @Override public void lock() { mask.lock(); } @Override public boolean tryLock() { return mask.tryLock(1); } @Override public void unlock() { mask.unlock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }; } }
package com.study.lock.locks5; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class JamesReentrantLock implements Lock { private boolean isFair; public JamesReentrantLock(boolean isFair){ this.isFair = isFair; } JamesAQS mask = new JamesAQS(){ public boolean tryLock(int acquires){ if (isFair){ return tryFairLock(acquires); }else{ return tryNonFairLock(acquires); } } //尝试获取独占锁 public boolean tryNonFairLock(int acquires) { //如果read count !=0 返回false if (readCount.get() !=0) return false; int wct = writeCount.get(); //拿到 独占锁 当前状态 if (wct==0){ if (writeCount.compareAndSet(wct, wct + acquires)){ //通过修改state来抢锁 owner.set(Thread.currentThread()); // 抢到锁后,直接修改owner为当前线程 return true; } }else if (owner.get() == Thread.currentThread()){ writeCount.set(wct + acquires); //修改count值 return true; } return false; } public boolean tryFairLock(int acquires){ //如果read count !=0 返回false if (readCount.get() !=0) return false; int wct = writeCount.get(); //拿到 独占锁 当前状态 if (wct==0){ JamesAQS.WaitNode head = waiters.peek(); if (head!=null && head.thread == Thread.currentThread()&& writeCount.compareAndSet(wct, wct + acquires)){ //通过修改state来抢锁 owner.set(Thread.currentThread()); // 抢到锁后,直接修改owner为当前线程 return true; } }else if (owner.get() == Thread.currentThread()){ writeCount.set(wct + acquires); //修改count值 return true; } return false; } //尝试释放独占锁 public boolean tryUnlock(int releases) { //若当前线程没有 持有独占锁 if(owner.get()!= Thread.currentThread()){ throw new IllegalMonitorStateException(); //抛IllegalMonitorStateException } int wc= writeCount.get(); int nextc = wc - releases; //计算 独占锁剩余占用 writeCount.set(nextc); //不管是否完全释放,都更新count值 if (nextc==0){ //是否完全释放 owner.compareAndSet(Thread.currentThread(), null); return true; }else{ return false; } } }; public void lock(){ mask.lock(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } @Override public boolean tryLock(){ return mask.tryLock(1); } @Override public void unlock(){ mask.unlock(); } @Override public void lockInterruptibly() throws InterruptedException { } }