zoukankan      html  css  js  c++  java
  • ReentrantReadWriteLock读写锁

    使用分析

      四种读写情况:读读 读写 写读  写写, 接下来分四种情况看最终结果

    情况一:读读使用

    public static void main(String[] args) throws InterruptedException {
    
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
    
        new Thread( ()->{
            readLock.lock();
            try {
                System.out.println("t1 try get readLock");
                Thread.sleep(2000L);
                System.out.println("t1 got readLock");
            }catch (Exception e){
    
            }finally {
                readLock.unlock();
                System.out.println("t1 relase readLock");
            }
    
        }, "t1").start();
    
        new Thread( ()->{
            readLock.lock();
            try {
                System.out.println("t2 try get readLock");
                Thread.sleep(1000L);
                System.out.println("t2 got readLock");
            }catch (Exception e){
    
            }finally {
                readLock.unlock();
                System.out.println("t2 relase readLock");
            }
    
        }, "t2").start();
    
    }
    
    运行结果:
    t1 try get readLock
    t2 try get readLock
    t2 got readLock
    t2 relase readLock
    t1 got readLock
    t1 relase readLock

    从运行结果可知: 读读非互斥

    情况二:读写使用

    public static void main(String[] args) throws InterruptedException {
    
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
    
        new Thread( ()->{
            readLock.lock();
            try {
                System.out.println("t1 try get readLock");
                Thread.sleep(2000L);
                System.out.println("t1 got readLock");
            }catch (Exception e){
    
            }finally {
                readLock.unlock();
                System.out.println("t1 relase readLock");
            }
    
        }, "t1").start();
    
        new Thread( ()->{
            writeLock.lock();
            try {
                System.out.println("t2 try get writeLock");
                Thread.sleep(1000L);
                System.out.println("t2 got writeLock");
            }catch (Exception e){
    
            }finally {
                writeLock.unlock();
                System.out.println("t2 relase writeLock");
            }
    
        }, "t2").start();
    
    }
    打印结果:
    t1 try get readLock
    t1 got readLock
    t1 relase readLock
    t2 try get writeLock
    t2 got writeLock
    t2 relase writeLock

    从运行结果可知: 读写互斥

    情况三:写读使用

    public static void main(String[] args) throws InterruptedException {
    
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
    
        new Thread( ()->{
            writeLock.lock();
            try {
                System.out.println("t1 try get writeLock");
                Thread.sleep(2000L);
                System.out.println("t1 got writeLock");
            }catch (Exception e){
    
            }finally {
                writeLock.unlock();
                System.out.println("t1 relase writeLock");
            }
    
        }, "t1").start();
    
        new Thread( ()->{
            readLock.lock();
            try {
                System.out.println("t2 try get readLock");
                Thread.sleep(1000L);
                System.out.println("t2 got readLock");
            }catch (Exception e){
    
            }finally {
                readLock.unlock();
                System.out.println("t2 relase readLock");
            }
    
        }, "t2").start();
    
    }
    打印结果:
    t1 try get writeLock
    t1 got writeLock
    t1 relase writeLock
    t2 try get readLock
    t2 got readLock
    t2 relase readLock

    从运行结果可知: 写读互斥

    情况四:写写使用

    public static void main(String[] args) throws InterruptedException {
    
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
    
        new Thread( ()->{
            writeLock.lock();
            try {
                System.out.println("t1 try get writeLock");
                Thread.sleep(2000L);
                System.out.println("t1 got writeLock");
            }catch (Exception e){
    
            }finally {
                writeLock.unlock();
                System.out.println("t1 relase writeLock");
            }
    
        }, "t1").start();
    
        new Thread( ()->{
            writeLock.lock();
            try {
                System.out.println("t2 try get writeLock");
                Thread.sleep(1000L);
                System.out.println("t2 got writeLock");
            }catch (Exception e){
    
            }finally {
                writeLock.unlock();
                System.out.println("t2 relase writeLock");
            }
    
        }, "t2").start();
    
    }
    打印结果:
    t1 try get writeLock
    t1 got writeLock
    t1 relase writeLock
    t2 try get writeLock
    t2 got writeLock
    t2 relase writeLock

    从运行结果可知: 写写互斥

    使用总结: 除读读共享锁,其他情况表现为互斥

    源码分析

    构造方法分析

      以下代码可以看出ReadLock WriteLock都是依赖ReentrantReadWriteLock类的同步器进行锁操作, 下面重点关注ReentrantReadWriteLock类中Sync的代码

    ReentrantReadWriteLock类
    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
        private static final long serialVersionUID = -6992448646407690164L;
        //读锁
        private final ReentrantReadWriteLock.ReadLock readerLock;
        //写锁
        private final ReentrantReadWriteLock.WriteLock writerLock;
        //同步器 获取锁和释放锁具体实现类
        final Sync sync;
    
        //默认构造器
        public ReentrantReadWriteLock() {
            this(false);
        }
    
        //实际构造器
        //初始化上面三个参数 readerLock writerLock  sync
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    }
    
    ReadLock类
    public static class ReadLock implements Lock, java.io.Serializable {
        private final Sync sync;
    
        //使用ReentrantReadWriteLock.sync赋值给ReadLock.sync
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }
    
    WriteLock类
    public static class WriteLock implements Lock, java.io.Serializable {
        private final Sync sync;
    
        //使用ReentrantReadWriteLock.WriteLock.sync
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

    Sync类

    abstract static class Sync extends AbstractQueuedSynchronizer {
    
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
        //读锁重入数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        //写锁重入数
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    
        //获取当前线程重入次数
        //这里比较难理解, ThreadLocalHoldCounter继承自ThreadLocal,每一个线程都有自己重入次数
        //这里的作用是将重入次数放在线程缓存中, 当对应执行线程需要获取重入次数时只需要使用get()方法
        private transient ThreadLocalHoldCounter readHolds;
    
       
        //最后一个获取读锁的线程
        //举例:读读共享锁, 可能存在若干读锁, 读(fist) 读 读 读(last), cachedHoldCounter记录的是last线程读锁重入次数
        private transient HoldCounter cachedHoldCounter;
    
        
        //第一个获取读锁的线程
        private transient Thread firstReader = null;
        //第一个获取读锁的线程获取读锁重入数
        private transient int firstReaderHoldCount;
    }

    读写锁的核心是把AQS中的state设置为一个32位的值, 高16位表示读锁重入次数, 低16位表示写锁重入次数

    读锁获取: 1 若线程需要进入CLH等待队列, 则state的值不变, 例:读写 写读

                    2若是锁持有线程为读锁, 且写状态为0,此时需要修改state的值, 例 读

    读锁释放: state减去 1<<16 

    写锁获取: 1若持有线程为当前线程state加1(因为写状态为低位直接加1就行) 

                     2若非持有线程,则需要判断锁没有被其他线程持有,state加1

    写锁释放: state减去1

    1 获取读锁

    public void lock() {
        sync.acquireShared(1);
    }
    //获取共享锁
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    //尝试获取共享锁返回值-1表示线程需要加入CLH队列等待 否则放行
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        //如果排他锁(写锁)被其他线程(非当前线程),返回失败
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        //获取共享锁(读锁)状态 持有锁的线程共享锁重入次数
        int r = sharedCount(c);
        
        if (
            //核心逻辑判断是第一个判断readerShouldBlock, 判断读锁是否需要阻塞 
            //公平模式判断需要入列  :存在CLH队列,并且当前锁占有线程非当前线程
            //非公平模式判断需要入列:增加首节点是否为写锁等待,优先写锁原则,其他类似公平模式
            !readerShouldBlock() 
            //重入个数未到最大值
            && r < MAX_COUNT 
            //CAS修改值
            && compareAndSetState(c, c + SHARED_UNIT)) {
            //---进入此处代码段则表示锁获取成功 以下代码作用是辅助属性的赋值
    
            //r==0读锁持有者重入次数为0
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } 
            //如果首个读节点为当前节点 表示重入 计数+1
            else if (firstReader == current) {
                firstReaderHoldCount++;
            } 
            //等价于 r!=0 且 读锁持有线程非当前线程
            //读读共享锁
            else {
                //设置重入次数
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        //补偿获取共享锁
        return fullTryAcquireShared(current);
    }

    2 释放读锁

    /** ReentrantReadWriteLock.ReadLock 类 */
    public void unlock() {
        sync.releaseShared(1);
    }
    
    
    /** AbstractQueuedSynchronizer类 */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    /** ReentrantReadWriteLock.ReadLock 类 */
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //如果第一个读锁为当前锁
        if (firstReader == current) {
            //线程只获取一次读锁 则将firstReader置为空
            if (firstReaderHoldCount == 1)
                firstReader = null;
            //线程有重入获取读锁 则将重入数减一
            else
                firstReaderHoldCount--;
        } 
        //如果当前线程非第一个读锁获取者 则从线程缓存中(ThreadLocal原理)中获取重入数, 
        //然后执行清除缓存(未重入) 或者 重入次数减一(有重入)
        else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //无限循环尝试compareAndSetState (CAS操作) 直到成功
        //因为读锁为共享锁 在执行CAS操作的同时可能会存在别的获取读锁线程正在获取或释放锁
        //所以有可能失败的 就需要再次尝试CAS操作
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

    3 获取写锁

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    //获取锁
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    //尝试获取锁
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        //如果c!=0则存在读锁或写锁
        if (c != 0) {
            //如果写锁等于0,说明读锁不等于0, 若持有线程非当前线程 失败
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //超过最大写锁设定值 抛出异常
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //代码能走到这里 说明写锁不等于0, 且持有当前线程, 锁重入
            setState(c + acquires);
            return true;
        }
        //以上代码必然直接返回结果, 代码走到这里说明c==0,不存在读写锁持有情况
        if (
            //判断当前线程获取写锁是否需要入CLH等待队列
            //writerShouldBlock()方法代码很简单,只是CLH队列是否有等待的线程
            //公平模式下如果有等待返回false(不等待),非公平模式直接返回false(不等待)
            writerShouldBlock() ||
            //不需要需要写锁进入CLH, 然后修改AQS state值失败, 则返回false
            !compareAndSetState(c, c + acquires))
            return false;
        //代码走到这前提 1不需要入CLH队列等待 2修改AQS state成功 直接设置锁占用线程为当前线程即可
        setExclusiveOwnerThread(current);
        return true;
    }

    4 释放写锁

    //释放锁
    public final boolean release(int arg) {
        //尝试释放锁 并判断重入次数是否全部释放完成
        //若全部重入释放完成 则unpark CLH等待队列的第一个节点
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    //尝试释放
    protected final boolean tryRelease(int releases) {
        //持锁线程非当前线程 抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //状态计算
        int nextc = getState() - releases;
        //判断重入情况 
        boolean free = exclusiveCount(nextc) == 0;
        //若重入为0 表示此线程可以让出占用
        if (free)
            setExclusiveOwnerThread(null);
        //状态设置
        setState(nextc);
        //返回是否释放线程
        return free;
    }

    附:

    1 高低位增加一次重入时AQS状态值的变更:

      假设有十进制数字 11115555 如果说希望高4位用来做读锁值 低四位用来做写锁值,

      那么增加一个读锁之后期望的到的结果是11125555,通过减法11125555-11115555=10000可知, 高位如果需要加1则实际需要加10000

      同理增加一个写锁之后期望的到的结果是11115556,通过减法11115556-11115555=1可知,低位直接按实际值增加即可 

      回到AQS增加读锁重入时 state = state+(1<<16) , AQS增加写锁重入时 state = state+1

    2 函数获取读锁数 int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

      这个应该比较好理解, 就是将原有的32位向右移16位, 那么低(右侧)消失, 举个例子 1111 1111 0000 000 向右移8位变成 1111 1111

    3 函数获取写锁数 int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

      需要掌握一些按位与的知识, EXCLUSIVE_MASK = 0000 0000 0000 0000 1111 1111 1111 1111 

      那么  c & EXCLUSIVE_MASK 表示同时为1 结果为1 否则为0,   

      假定 c = 0000 0000 0000 0001 0000 0000 0000 0010 表示读状态为1 写状态为2

      0000 0000 0000 0000 1111 1111 1111 1111

      &

      0000 0000 0000 0001 0000 0000 0000 0010

          --------------------------------------------------------------------------

       =  0000 0000 0000 0000 0000 0000 0000 0010

      c & EXCLUSIVE_MASK 得到的最终结果 0000 0000 0000 0000 0000 0000 0000 0010, 从而筛选出写状态值为2

  • 相关阅读:
    多任务拷贝小案例
    进程池
    进程间的通信
    互斥锁
    创建函数/类的线程
    udp/tcp流程
    发布模块
    私有属性/方法的访问
    深入了解jQuery之整体架构
    (转)前端面试题
  • 原文地址:https://www.cnblogs.com/xieyanke/p/12316686.html
Copyright © 2011-2022 走看看