zoukankan      html  css  js  c++  java
  • 1.3.2 AQS 读写锁

    1.读写锁原理

    2.利用读写锁写一个安全的HashMap

    读写锁原理

    ReadWriteLock:维护一对关联锁,一个读锁一个写锁,读锁可以由多个线程同时获得,写锁只能被一个线程获得。同一时间,读锁和写锁不能被不同线程同时获得。

    1.th1想获取写锁,检查readCount==0,满足,说明读锁未被占用,此时可抢写锁;检查writeCount==0说明写锁未被占用,采用CAS修改writeCount为1,若修改成功再将owner改为th1的引用

    2.某线程想获取读锁,先检查writeCount是否为0,此时不为0,则不能去获取读锁,直接进入waiters,接下来的两个获取读锁的操作类似,都进了waiters

    3.th2想获取写锁,检查readCount==0,满足,说明读锁未被占用,此时可抢写锁;检查writeCount==1说明写锁以被占用,进而查看owner是不是自己,结果发现不是自己,进waiters

    4.th1又想获取写锁,检查readCount==0,满足,说明读锁未被占用,此时可抢写锁;检查writeCount==1说明写锁以被占用,进而查看owner是不是自己,结果发现是自己,修改writeCount为2

    5.此时假如th1开始释放握有的2个写锁,释放时先判断owner是不是自己,是,将writeCount变为1,继续释放第二个锁writeCount变为0,owner变为null,这是waiters中的第一个线程会被唤醒,唤醒后开始抢读锁,首先判断writeCount==0,满足,然后将读锁给它,readCount加1,然后它会继续判断waiters里的头部是不是还是获取读锁的线程,若是则继续出队列获取读锁,直到队列头部不是获取读锁的线程

    6.假如这3个获取到读锁的线程开始逐一释放读锁,readCount一旦等于0,则会唤醒队列头部想获取写锁的线程去尝试获取写锁

    锁降级

    ReadWriteLock中的读锁和写锁,如果某线程已经拿到写锁,在释放写锁之前它可以再次拿到读锁,等写锁释放后,该线程将继续占有读锁。

    通俗一点就是说:既然你已经拿到写锁了,而且当前只有你自己在写,也没有其他线程在读,那你自己读也是可以的;但是如果你拿到读锁了,你想去写,对不起不可以,因为拿到读锁的不止你一个,其他人也在读,所以你不能写。

    HashMap、HashTable、ConcurrentHashMap

    HashMap线程不安全,HashTable利用synchronized保证线程安全,但效率太低,不能多并发,ConcurrentHashMap使用读写锁保证读的高并发和写的单并发,同一时间可以有多条线程读,提高了读效率

    手写ConcurrentHashMap

    package com.study.lock;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    // 将hashmap 改造一个并发安全的
    // 这是ReentrantReadWriteLock注释中给出的一个示例
    public class Demo7_Map {
        private final Map<String, Object> m = new HashMap<>();
        private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        private final Lock r = rwl.readLock();
        private final Lock w = rwl.writeLock();
    
        public Object get(String key){
            r.lock();
            try {
                return m.get(key);
            }finally {
                r.unlock();
            }
        }
    
        public Object allKeys(){
            r.lock();
            try {
                return m.keySet().toArray();
            }finally {
                r.unlock();
            }
        }
    
        public Object put(String key, Object value){
            w.lock();
            try {
                return m.put(key, value);
            }finally {
                w.unlock();
            }
        }
    
        public void clear(){
            w.lock();
            try {
                m.clear();
            }finally {
                w.unlock();
            }
        }
    
    }
    

    利用 读写锁+数据库+REDIS 解决高并发场景下数据读写安全问题

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /*
    // 缓存示例
       这是ReentrantReadWriteLock注释中给出的一个示例
       用于构建一个缓存,该缓存在读取并使用值的时候,不允许修改缓存值
       目前还没找到适用场景,有同学有适用场景的,可以推荐给老师
     */
    
    public class Demo8_CacheData {
    	public static void main(String args[]) {
    		System.out.println(TeacherInfoCache.get("Kody"));
    	}
    }
    
    class TeacherInfoCache {
    	static volatile boolean cacheValid;
    	static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
    	static Object get(String dataKey) {
    		Object data = null;
    
    		// 读数据,加读锁
    		rwl.readLock().lock();
    		try {
    			if (cacheValid) {
    				data = Redis.data.get(dataKey);
    			} else {
    				// 缓存取不到,从数据库获取,但如果多条线程瞬间多次查询数据库,数据库可能宕机,利用锁解决
    				// data= DataBase.queryUserInfo();
    				rwl.readLock().unlock();
    
    				// 加写锁之后,并不会马上获取到所,会等到所有的读锁释放
    				rwl.writeLock().lock();
    				try {
    					if (!cacheValid) {
    						data = DataBase.queryUserInfo();
    						Redis.data.put(dataKey, data);
    						cacheValid = true;
    					}
    					// 此处加读锁是为了与最后的finally中释放读锁组成一对
    					// 在释放写锁之前获取读锁,等写锁释放后,该线程仍占有读锁,不用再去抢读锁
    					rwl.readLock().lock();//此处发生锁降级,在释放写锁之前再次拿到读锁
    
    				} finally {
    					rwl.writeLock().unlock();
    				}
    			}
    			return data;
    		} finally {
    			rwl.readLock().unlock();
    		}
    	}
    }
    
    class DataBase {
    	static String queryUserInfo() {
    		System.out.println("查询数据库。。。");
    		return "name:Kody,age:40,gender:true,";
    	}
    }
    
    class Redis {
    	static Map<String, Object> data = new HashMap<>();
    }
    

     手写ReadWriteLock

    package com.study.lock.locks1;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    public class JamesReadWriteLock {
    	private AtomicInteger readCount = new AtomicInteger(0);
    	private AtomicInteger writeCount = new AtomicInteger(0);
    
    	// 独占锁 拥有者
    	private AtomicReference<Thread> owner = new AtomicReference<>();
    
    	// 等待队列
    	public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>();
    
    	class WaitNode {
    		int type = 0; // 0 为想获取独占锁的线程, 1为想获取共享锁的线程
    		Thread thread = null;
    		int arg = 0;
    
    		public WaitNode(Thread thread, int type, int arg) {
    			this.thread = thread;
    			this.type = type;
    			this.arg = arg;
    		}
    	}
    
    	// 获取独占锁
    	public void lock() {
    		int arg = 1;
    		// 尝试获取独占锁,若成功,退出方法, 若失败...
    		if (!tryLock(arg)) {
    			// 标记为独占锁
    			WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
    			waiters.offer(waitNode); // 进入等待队列
    
    			// 循环尝试拿锁
    			for (;;) {
    				// 若队列头部是当前线程
    				WaitNode head = waiters.peek();
    				if (head != null && head.thread == Thread.currentThread()) {
    					if (!tryLock(arg)) { // 再次尝试获取 独占锁
    						LockSupport.park(); // 若失败,挂起线程
    					} else { // 若成功获取
    						waiters.poll(); // 将当前线程从队列头部移除
    						return; // 并退出方法
    					}
    				} else { // 若不是队列头部元素
    					LockSupport.park(); // 将当前线程挂起
    				}
    			}
    		}
    	}
    
    	// 释放独占锁
    	public boolean unlock() {
    		int arg = 1;
    
    		// 尝试释放独占锁 若失败返回true,若失败...
    		if (tryUnlock(arg)) {
    			WaitNode next = waiters.peek(); // 取出队列头部的元素
    			if (next != null) {
    				Thread th = next.thread;
    				LockSupport.unpark(th); // 唤醒队列头部的线程
    			}
    			return true; // 返回true
    		}
    		return false;
    	}
    
    	// 尝试获取独占锁
    	public boolean tryLock(int acquires) {
    		// 如果read count !=0 返回false
    		if (readCount.get() != 0)
    			return false;
    
    		int wct = writeCount.get(); // 拿到 独占锁 当前状态
    
    		if (wct == 0) {
    			if (writeCount.compareAndSet(wct, wct + acquires)) { // 通过修改state来抢锁
    				owner.set(Thread.currentThread()); // 抢到锁后,直接修改owner为当前线程
    				return true;
    			}
    		} else if (owner.get() == Thread.currentThread()) {
    			writeCount.set(wct + acquires); // 修改count值
    			return true;
    		}
    
    		return false;
    	}
    
    	// 尝试释放独占锁
    	public boolean tryUnlock(int releases) {
    		// 若当前线程没有 持有独占锁
    		if (owner.get() != Thread.currentThread()) {
    			throw new IllegalMonitorStateException(); // 抛IllegalMonitorStateException
    		}
    
    		int wc = writeCount.get();
    		int nextc = wc - releases; // 计算 独占锁剩余占用
    		writeCount.set(nextc); // 不管是否完全释放,都更新count值
    
    		if (nextc == 0) { // 是否完全释放
    			owner.compareAndSet(Thread.currentThread(), null);
    			return true;
    		} else {
    			return false;
    		}
    	}
    
    	// 获取共享锁
    	public void lockShared() {
    		int arg = 1;
    
    		if (tryLockShared(arg) < 0) { // 如果tryAcquireShare失败
    			// 将当前进程放入队列
    			WaitNode node = new WaitNode(Thread.currentThread(), 1, arg);
    			waiters.offer(node); // 加入队列
    
    			for (;;) {
    				// 若队列头部的元素是当前线程
    				WaitNode head = waiters.peek();
    				if (head != null && head.thread == Thread.currentThread()) {
    					if (tryLockShared(arg) >= 0) { // 尝试获取共享锁, 若成功
    						waiters.poll(); // 将当前线程从队列中移除
    
    						WaitNode next = waiters.peek();
    						if (next != null && next.type == 1) { // 如果下一个线程也是等待共享锁
    							LockSupport.unpark(next.thread); // 将其唤醒
    						}
    						return; // 退出方法
    					} else { // 若尝试失败
    						LockSupport.park(); // 挂起线程
    					}
    				} else { // 若不是头部元素
    					LockSupport.park();
    				}
    			}
    		}
    	}
    
    	// 解锁共享锁
    	public boolean unLockShared() {
    		int arg = 1;
    
    		if (tryUnLockShared(arg)) { // 当read count变为0,才叫release share成功
    			WaitNode next = waiters.peek();
    			if (next != null) {
    				LockSupport.unpark(next.thread);
    			}
    			return true;
    		}
    		return false;
    	}
    
    	// 尝试获取共享锁
    	public int tryLockShared(int acquires) {
    		for (;;) {
    			if (writeCount.get() != 0 && owner.get() != Thread.currentThread())
    				return -1;
    
    			int rct = readCount.get();
    			if (readCount.compareAndSet(rct, rct + acquires)) {
    				return 1;
    			}
    		}
    	}
    
    	// 尝试解锁共享锁
    	public boolean tryUnLockShared(int releases) {
    		for (;;) {
    			int rc = readCount.get();
    			int nextc = rc - releases;
    			if (readCount.compareAndSet(rc, nextc)) {
    				return nextc == 0;
    			}
    		}
    	}
    }
    

     模板方法模式:提取 ReentrantLock 和 ReadWriteLock公共部分=>AQS

    用上面手写的ReadWriteLock替换上一节手写的ReentrantLock中的相同方法后,ReentrantLock可以继续使用,

    因此可将上面ReadWriteLock的代码作为一个公共类JamsAQS来使用,

    为了实现公平锁(在等待队列头部才进行抢锁,而不是上面写的先抢锁->如果没抢到->放到等待队列,这段逻辑都是在try方法中写的),JamsAQS中的tryxx方法均不实现,放到锁的匿名内部类中实现:

    匿名内部类重写方法:https://blog.csdn.net/shenhaiyushitiaoyu/article/details/84142618

    package com.study.lock.locks5;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.locks.LockSupport;
    
    public class JamesAQS {
    	AtomicInteger readCount = new AtomicInteger(0);
    	AtomicInteger writeCount = new AtomicInteger(0);
    
    	// 独占锁 拥有者
    	AtomicReference<Thread> owner = new AtomicReference<>();
    
    	// 等待队列
    	public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>();
    
    	class WaitNode {
    		int type = 0; // 0 为想获取独占锁的线程, 1为想获取共享锁的线程
    		Thread thread = null;
    		int arg = 0;
    
    		public WaitNode(Thread thread, int type, int arg) {
    			this.thread = thread;
    			this.type = type;
    			this.arg = arg;
    		}
    	}
    
    	// 获取独占锁
    	public void lock() {
    		int arg = 1;
    		// 尝试获取独占锁,若成功,退出方法, 若失败...
    		if (!tryLock(arg)) {
    			// 标记为独占锁
    			WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
    			waiters.offer(waitNode); // 进入等待队列
    
    			// 循环尝试拿锁
    			for (;;) {
    				// 若队列头部是当前线程
    				WaitNode head = waiters.peek();
    				if (head != null && head.thread == Thread.currentThread()) {
    					if (!tryLock(arg)) { // 再次尝试获取 独占锁
    						LockSupport.park(); // 若失败,挂起线程
    					} else { // 若成功获取
    						waiters.poll(); // 将当前线程从队列头部移除
    						return; // 并退出方法
    					}
    				} else { // 若不是队列头部元素
    					LockSupport.park(); // 将当前线程挂起
    				}
    			}
    		}
    	}
    
    	// 释放独占锁
    	public boolean unlock() {
    		int arg = 1;
    
    		// 尝试释放独占锁 若失败返回true,若失败...
    		if (tryUnlock(arg)) {
    			WaitNode next = waiters.peek(); // 取出队列头部的元素
    			if (next != null) {
    				Thread th = next.thread;
    				LockSupport.unpark(th); // 唤醒队列头部的线程
    			}
    			return true; // 返回true
    		}
    		return false;
    	}
    
    	// 获取共享锁
    	public void lockShared() {
    		int arg = 1;
    
    		if (tryLockShared(arg) < 0) { // 如果tryAcquireShare失败
    			// 将当前进程放入队列
    			WaitNode node = new WaitNode(Thread.currentThread(), 1, arg);
    			waiters.offer(node); // 加入队列
    
    			for (;;) {
    				// 若队列头部的元素是当前线程
    				WaitNode head = waiters.peek();
    				if (head != null && head.thread == Thread.currentThread()) {
    					if (tryLockShared(arg) >= 0) { // 尝试获取共享锁, 若成功
    						waiters.poll(); // 将当前线程从队列中移除
    
    						WaitNode next = waiters.peek();
    						if (next != null && next.type == 1) { // 如果下一个线程也是等待共享锁
    							LockSupport.unpark(next.thread); // 将其唤醒
    						}
    						return; // 退出方法
    					} else { // 若尝试失败
    						LockSupport.park(); // 挂起线程
    					}
    				} else { // 若不是头部元素
    					LockSupport.park();
    				}
    
    			}
    		}
    	}
    
    	// 解锁共享锁
    	public boolean unLockShared() {
    		int arg = 1;
    
    		if (tryUnLockShared(arg)) { // 当read count变为0,才叫release share成功
    			WaitNode next = waiters.peek();
    			if (next != null) {
    				LockSupport.unpark(next.thread);
    			}
    			return true;
    		}
    		return false;
    	}
    
    	// 尝试获取独占锁
    	public boolean tryLock(int acquires) {
    		throw new UnsupportedOperationException();
    	}
    
    	// 尝试释放独占锁
    	public boolean tryUnlock(int releases) {
    		throw new UnsupportedOperationException();
    	}
    
    	// 尝试获取共享锁
    	public int tryLockShared(int acquires) {
    		throw new UnsupportedOperationException();
    	}
    
    	// 尝试解锁共享锁
    	public boolean tryUnLockShared(int releases) {
    		throw new UnsupportedOperationException();
    	}
    }
    
    package com.study.lock.locks5;
    
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReadWriteLock;
    
    public class JamesReadWriteLock implements ReadWriteLock {
        JamesAQS mask = new JamesAQS(){
            //尝试获取独占锁
            public boolean tryLock(int acquires) {
                //如果read count !=0 返回false
                if (readCount.get() !=0)
                    return false;
    
                int wct = writeCount.get();     //拿到 独占锁 当前状态
    
                if (wct==0){
                    if (writeCount.compareAndSet(wct, wct + acquires)){     //通过修改state来抢锁
                        owner.set(Thread.currentThread());  //  抢到锁后,直接修改owner为当前线程
                        return true;
                    }
                }else if (owner.get() == Thread.currentThread()){
                    writeCount.set(wct + acquires);     //修改count值
                    return true;
                }
                return false;
            }
    
            //尝试释放独占锁
            public boolean tryUnlock(int releases) {
                //若当前线程没有 持有独占锁
                if(owner.get()!= Thread.currentThread()){
                    throw new IllegalMonitorStateException();       //抛IllegalMonitorStateException
                }
    
                int wc= writeCount.get();
                int nextc = wc - releases;      //计算 独占锁剩余占用
                writeCount.set(nextc);      //不管是否完全释放,都更新count值
    
                if (nextc==0){  //是否完全释放
                    owner.compareAndSet(Thread.currentThread(), null);
                    return true;
                }else{
                    return false;
                }
            }
    
            //尝试获取共享锁
            public int tryLockShared(int acquires) {
                for (;;){
                    if (writeCount.get()!=0 &&
                            owner.get() != Thread.currentThread())
                        return -1;
    
                    int rct = readCount.get();
                    if (readCount.compareAndSet(rct, rct + acquires)){
                        return 1;
                    }
                }
            }
    
            //尝试解锁共享锁
            public boolean tryUnLockShared(int releases) {
                for(;;){
                    int rc = readCount.get();
                    int nextc = rc - releases;
                    if (readCount.compareAndSet(rc, nextc)){
                        return nextc==0;
                    }
                }
            }
        };
    
        @Override
        public Lock readLock() {
            return new Lock() {
                @Override
                public void lock() {
                    mask.lockShared();
                }
    
                @Override
                public void lockInterruptibly() throws InterruptedException {
    
                }
    
                @Override
                public boolean tryLock() {
                    return mask.tryLockShared(1) == 1;
                }
    
                @Override
                public void unlock() {
                    mask.unLockShared();
                }
    
                @Override
                public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
                    return false;
                }
    
                @Override
                public Condition newCondition() {
                    return null;
                }
            };
        }
    
        @Override
        public Lock writeLock() {
            return new Lock() {
                @Override
                public void lock() {
                    mask.lock();
                }
    
                @Override
                public boolean tryLock() {
                    return mask.tryLock(1);
                }
    
    
                @Override
                public void unlock() {
                    mask.unlock();
                }
    
                @Override
                public void lockInterruptibly() throws InterruptedException {
    
                }
    
                @Override
                public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
                    return false;
                }
    
                @Override
                public Condition newCondition() {
                    return null;
                }
            };
        }
    }
    
    package com.study.lock.locks5;
    
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    public class JamesReentrantLock implements Lock {
    
        private boolean isFair;
    
        public JamesReentrantLock(boolean isFair){
            this.isFair = isFair;
        }
    
        JamesAQS mask = new JamesAQS(){
            public boolean tryLock(int acquires){
                if (isFair){
                    return tryFairLock(acquires);
                }else{
                    return tryNonFairLock(acquires);
                }
            }
    
            //尝试获取独占锁
            public boolean tryNonFairLock(int acquires) {
                //如果read count !=0 返回false
                if (readCount.get() !=0)
                    return false;
    
                int wct = writeCount.get();     //拿到 独占锁 当前状态
    
                if (wct==0){
                    if (writeCount.compareAndSet(wct, wct + acquires)){     //通过修改state来抢锁
                        owner.set(Thread.currentThread());  //  抢到锁后,直接修改owner为当前线程
                        return true;
                    }
                }else if (owner.get() == Thread.currentThread()){
                    writeCount.set(wct + acquires);     //修改count值
                    return true;
                }
    
                return false;
            }
    
            public boolean tryFairLock(int acquires){
                //如果read count !=0 返回false
                if (readCount.get() !=0)
                    return false;
    
                int wct = writeCount.get();     //拿到 独占锁 当前状态
    
                if (wct==0){
                    JamesAQS.WaitNode head = waiters.peek();
                    if (head!=null && head.thread == Thread.currentThread()&&
                            writeCount.compareAndSet(wct, wct + acquires)){     //通过修改state来抢锁
                        owner.set(Thread.currentThread());  //  抢到锁后,直接修改owner为当前线程
                        return true;
                    }
                }else if (owner.get() == Thread.currentThread()){
                    writeCount.set(wct + acquires);     //修改count值
                    return true;
                }
    
                return false;
            }
    
    
            //尝试释放独占锁
            public boolean tryUnlock(int releases) {
                //若当前线程没有 持有独占锁
                if(owner.get()!= Thread.currentThread()){
                    throw new IllegalMonitorStateException();       //抛IllegalMonitorStateException
                }
    
                int wc= writeCount.get();
                int nextc = wc - releases;      //计算 独占锁剩余占用
                writeCount.set(nextc);      //不管是否完全释放,都更新count值
    
                if (nextc==0){  //是否完全释放
                    owner.compareAndSet(Thread.currentThread(), null);
                    return true;
                }else{
                    return false;
                }
    
            }
    
        };
    
        public void lock(){
            mask.lock();
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
    
        @Override
        public Condition newCondition() {
            return null;
        }
    
        @Override
        public boolean tryLock(){
            return mask.tryLock(1);
        }
    
        @Override
        public void unlock(){
            mask.unlock();
        }
    
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
    }
    
  • 相关阅读:
    python 远程 部署和运行
    学习笔记——UML类图
    Core Data 多线程操作实战篇
    Core Data系列六——Custom Migration
    Core Data系列五——数据迁移方案
    NSOperation以及NSOperationQueue的使用
    Magical Record设计小谈
    Core Data系列四——多线程设计
    Core Data系列三——基本使用
    Core Data系列二——基础概念
  • 原文地址:https://www.cnblogs.com/yfzhou528/p/11273201.html
Copyright © 2011-2022 走看看