zoukankan      html  css  js  c++  java
  • JUC锁框架源码阅读-ReentrantReadWriteLock

    说明

    我们都知道ArrayList不是线程安全的,在读的时候同时并发在写,在写的时候同时在读,会出现索引越界问题,解决这个问题醉简单的方式在写和读的地方都加上锁。都加上锁的话并发读也会产生互斥

    但是为了性能 读的频繁写的时候比较少。我们都会允许读读共享,读写互斥,写写互斥,ReentrantReadWriteLock 就是读写锁 读读共享,读写互斥,写写互

    为什么ArrayList线程不安全可以参考《ArrayList在多线程的线程不安全的几种体现》

    ReentrantReadWriteLock 是基于AQS实现的 AQS源码阅读参考《JUC锁框架源码阅读-AbstractQueuedSynchronizer》

    关于位运算可以参考:《java位运算符,&,|,>>>,>>,<<,<<<的区别》

    初始化

    main

     //<1>
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    1.通过位运算符& 通过AQS state&EXCLUSIVE_MASK 进行运算,如果大于0则表示写锁以及被持有,则获取读锁(AQS共享锁)失败 返回-1加入CLH队列

    2.如果小于0则表示没有写锁。读读共享(共享锁可以多线程持有)则尝试获取,通过state从高位右移可以获取到写锁数量。

    3.先因为int是32位 16位是临界,则65536是最大支持个线程可获取读锁数量(一般够用了),需要校验是否超过65536如果超过了直接获取失败 加入CLH队列

    4.没有超过如果限制没有任何线程获取读锁,则直接CAS尝试获取

    5.获取失败则自旋获取,每次获取都需要判断是否写锁被持有了,自旋直到获取成功或者因为写锁被持有,或者超出最大可获取数量返回对应结果(注获取失败都是返回-1 AQS将阻塞线程并加入CLH队列)

    6.获取成功 因为可重入将更新ThreadLocal的HoldCounter count+1 

    <1>

        public ReentrantReadWriteLock() {
            this(false);
        }
        public ReentrantReadWriteLock(boolean fair) {
            //可以sync是ReentrantReadWriteLock 的2个内部类。可以发现ReentrantReadWriteLock 也是支持公平和非公平的
            sync = fair ? new ReentrantReadWriteLock.FairSync() : new ReentrantReadWriteLock.NonfairSync();
            //读锁也是内部内 内部保存了ReentrantReadWriteLock的引用 意味着可以拿到sync
            readerLock = new ReentrantReadWriteLock.ReadLock(this);
            //写锁也是内部内 内部保存了ReentrantReadWriteLock的引用 意味着可以拿到sync
            writerLock = new ReentrantReadWriteLock.WriteLock(this);
        }

    Sync

    ReentrantReadWriteLock 的公平锁和非公平锁都继承了Sync

    继续看类结构 他们只重写了部分方法 主要是为了实现公平和非公平锁,具体加锁都在父类Sync

     /**
         * Nonfair version of Sync
         */
        static final class NonfairSync extends ReentrantReadWriteLock.Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            //重写了writerShouldBlock方法
            final boolean writerShouldBlock() {
                return false; // writers can always barge
            }
            final boolean readerShouldBlock() {
                return apparentlyFirstQueuedIsExclusive();
            }
        }
    
        /**
         * Fair version of Sync
         */
        static final class FairSync extends ReentrantReadWriteLock.Sync {
            private static final long serialVersionUID = -2274990926593161451L;
            //重写了writerShouldBlock
            final boolean writerShouldBlock() {
                return hasQueuedPredecessors();
            }
            //重写了readerShouldBlock
            final boolean readerShouldBlock() {
                return hasQueuedPredecessors();
            }
        }
    View Code

    ReentrantReadWriteLock 是通过将AQS state int 32转2进制通过高16位表示读锁 低16位表示写锁

     

        //因为int 是32位,通过16将他们一分为二 高位加读锁 低位加写锁
        static final int SHARED_SHIFT   = 16;
        //1转为二进制向左边移动16位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        //1转为二进制向左边移动16位 -1表示高的最小值
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        //1转为二进制向左边移动16位 -1表示低的最大值
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
        //将state向右移动16位 则可以得到持有锁的数量 我的理解 就是高位的移动到后面当成16进制处理
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    
        /**
         * 位运算 将&左右2边转为二进制 同时为1 则为1否则为0 得出写锁数量
         * 00000000 00000001 00000000 00000000  高位读锁与EXCLUSIVE_MASK计算后 =0
         * 00000000 00000001 00000000 00000001  低位写锁与EXCLUSIVE_MASK计算后  =1
         * 00000000 00000000 11111111 11111111 EXCLUSIVE_MASK 
         * @param c
         * @return
         */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    HoldCounter

        //Sync内部类 锁是可重入的,保存每个线程持有数量
        static final class HoldCounter {
            //计数器
            int count = 0;
            //线程id
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }
    
        /**
         *Sync内部类 重新了ThreadLocal的 initialValue方法,为ThreadLocal设置默认值
         */
        static final class ThreadLocalHoldCounter
                extends ThreadLocal<ReentrantReadWriteLock.Sync.HoldCounter> {
            public ReentrantReadWriteLock.Sync.HoldCounter initialValue() {
                return new ReentrantReadWriteLock.Sync.HoldCounter();
            }
        }
        //sync成员变量
        private transient ReentrantReadWriteLock.Sync.ThreadLocalHoldCounter readHolds;

    读锁加锁

    main

     public static void main(String[] args)  {
            //<L>初始化
            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            //获得读锁 内部return readLock
            Lock rlock = readWriteLock.readLock();
            //<1>加锁
            rlock.lock();
        }

    <1>lock

    public void lock() {
            //<2>调用的继承父类的AQS方法
            sync.acquireShared(1);
        }

    <2>acquireShared

    可以参考:《AQS源码阅读》

    // 获取共享锁
        public final void acquireShared(int arg) {
            //<3>模板模式 抽象方法 由子类实现 尝试去获取共享锁,如果返回值小于0表示获取共享锁失败 这里调用的是Snyc
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }

    <3>tryAcquireShared

    java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared

     protected final int tryAcquireShared(int unused) {
            //获得当前线程
            Thread current = Thread.currentThread();
            //获得AQS状态
            int c = getState();
            /**
             * exclusiveCount判断是否已经持有了写锁
             * getExclusiveOwnerThread 获得持有写锁的线程,如果不是当前线程持有直接返回-1 获取读锁失败 这里说明 同一个线程加了写锁是可以继续获取读锁的
             */
            if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                return -1;
            //获取获取读锁的数量
            int r = sharedCount(c);
            /**
             * readerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法
             * FairSync(公平锁):直接如果队列不为空就返回false
             * NonfairSync(非公平锁)遍历CLH队列如果已经在队列中表示轮到他获取锁了返回true否则返回false 需要重新加入CLH队列
             * r < MAX_COUNT是否超了 读锁的获取临界,因为state划分了高位和低位所以不能超过最大值 1<<16-1=65536 所以同时支持65536 获取这么多度锁。。理论上够用了
             * compareAndSetState(c, c + SHARED_UNIT)
             */
            if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                //没有持有任何锁的情况
                if (r == 0) {
                    //记录第一个获读锁的线程
                    firstReader = current;
                    //第一个获取获取读锁线程+1
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    //如果当前线程是第一获取读锁的线程 计数器加1
                    firstReaderHoldCount++;
                } else {
                    //获取最近一个获取成功获取读锁的计数器 这省却了ThreadLocal查找,
                    ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
                    //判断是否是当前线程的 如果不是则从ThreadLocal查找 并设置到最近一个获取
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //计数器+1
                    rh.count++;
                }
                return 1;
            }
            //<4>并发获取锁的情况 尝试获取读锁失败,自旋重试。
            return fullTryAcquireShared(current);
        }

    <4>fullTryAcquireShared

     final int fullTryAcquireShared(Thread current) {
            ReentrantReadWriteLock.Sync.HoldCounter rh = null;
            //自旋
            for (;;) {
                //获取AQS 锁状态
                int c = getState();
                /**
                 * exclusiveCount判断是否已经持有了写锁
                 * getExclusiveOwnerThread 获得持有写锁的线程,如果不是当前线程持有直接返回-1 获取读锁失败 这里说明 同一个线程加了写锁是可以继续获取读锁的
                 */
                if (exclusiveCount(c) != 0) {
                    //如果写锁以及被获取 同时不是当前线程直接返回-1 不自旋
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    /**
                     *          * readerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法
                     *          * FairSync(公平锁):直接如果队列不为空就返回false
                     *          * NonfairSync(非公平锁)遍历CLH队列如果已经在队列中表示轮到他获取锁了返回true否则返回false 需要重新加入CLH队列
                     */
                } else if (readerShouldBlock()) {
                    // 第一个获取读锁的是当前线程不做处理
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        //更新锁计数(可重入的体现)
                        if (rh == null) {
                            //获取最近一个获取读锁线程的Counter
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                //如果不是最近的从ThreadLocal获取
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    //如果当前线程的持有读锁数为0,那么就没必要使用计数器,直接移除
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            //结束自旋
                            return -1;
                    }
                }
                //超过最大可获取数量
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //自旋获取共享锁 获取成功Countter++
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

    读锁释放锁

    1.每次释放通过HoldCounter 的count -1直到减为0则返回AQS父类释放成功,将AQS的state -1,并将计数器从ThreadLcal清除

    2.AQS则会唤醒CLH阻塞队列重新尝试重新获取锁

    main

       public static void main(String[] args)  {
            //<L>初始化
            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            //获得读锁 内部return readLock
            Lock rlock = readWriteLock.readLock();
            rlock.lock();
            //<1>
            rlock.unlock();
    
        }

    <1>unlock

       public void unlock() {
            //<2>调用Sync继承AQC方法
            sync.release(1);
        }

    <2>releaseShared

    可以参考《AQS源码》

    // 释放共享锁
        public final boolean releaseShared(int arg) {
            //<3>模板模式 抽象方法尝试释放共享锁
            if (tryReleaseShared(arg)) {
                // 唤醒等待共享锁的线程
                doReleaseShared();
                return true;
            }
            return false;
        }

    <3>tryReleaseShared

     protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //是不第一个获取锁
            if (firstReader == current) {
                //可重入的特点 直接置空
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;//计数器-1
            } else {
                //判断是不是最近获取读锁的线程
                ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();//如果不是 从ThreadLocal获取
                int count = rh.count;
                //可重入特点 判断数量 觉得
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            //自旋释放释放锁
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

    写锁加锁

    main

     //<L>初始化
            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            //获得读锁 内部return readLock
            Lock writeLock = readWriteLock.writeLock();
            //<1>加锁
            writeLock.lock();

    1.如果AQS state不等于0,表示有可能获取的写锁或者读锁,根据位运算符&获取写锁数量,如果数量=0那么表示是读锁。直接获取失败(加入CLH队列阻塞)

    2.如果获取写锁数量大于0 根据getExclusiveOwnerThread() 判断是否不是当前线程持有,如果是的话直接获取成功(可重入) state+1并通过CAS修改

    <1>lock

    public void lock() {
            //<2>调用继承自AQS加锁方法
            sync.acquire(1);
        }

    <2>acquire

    可以参考《AQS源码阅读》

    public final void acquire(int arg) {
            /**
             *<3>tryAcquire  模板模式 是抽象方法由子类实现获取锁步骤
             *addWaiter   如果加锁失败 创建节点并放入队列尾
             *<acquireQueued 通过新创建的节点 判断是重试获取锁。还是阻塞线程
             */
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    <3>tryAcquire

     protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            //获得AQS锁状态
            int c = getState();
            //获得写锁数量
            int w = exclusiveCount(c);
            //>0表示写锁或者读锁被获取
            if (c != 0) {
                //==0表示读锁被持有 直接返回false 加入CLH队列 如果是写锁判断是是否是当前线程持有(可重入)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //判断是否有超过可获取写锁数量
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //则state++ 可重入
                setState(c + acquires);
                return true;
            }
            /**
             * writerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法
             * FairSync(公平锁): 不在队列返回true 在队列返回false
             * NonfairSync(非公平锁)直接返回false 尝试直接获取锁
             * r < MAX_COUNT是否超了 读锁的获取临界,因为state划分了高位和低位所以不能超过最大值 1<<16-1=65536 所以同时支持65536 获取这么多度锁。。理论上够用了
             * compareAndSetState(c, c + SHARED_UNIT)
             */
            if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                return false;
            //设置当前持有锁线程
            setExclusiveOwnerThread(current);
            return true;
        }

    写锁释放锁

    main 

      public static void main(String[] args)  {
            //<L>初始化
            ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
            //获得读锁 内部return readLock
            Lock writeLock = readWriteLock.writeLock();
            //<1>释放锁
            writeLock.unlock();
    
        }

    1.getExclusiveOwnerThread() == Thread.currentThread() 判断是否是当前线程持有锁 如不过不是 直接抛错

    2.AQS sate-- 更新到AQS state 如果大于0返回释放失败 如果==0  清空exclusiveOwnerThread 返回释放成功 后续交给AQS处理 唤醒CLH等待线程重新获取锁

    <1>unlock

        public void unlock() {
            //<2>调用的继承AQS的共享锁释放方法
            sync.release(1);
        }

    <2>relase

    可以参考 《AQS源码阅读》

    // 在独占锁模式下,释放锁的操作
        public final boolean release(int arg) {
            // <3>调用tryRelease子类方法,尝试去释放锁,由子类具体实现
            if (tryRelease(arg)) {
                Node h = head;
                // <如果队列头节点的状态不是0,那么队列中就可能存在需要唤醒的等待节点。
                // 还记得我们在acquireQueued(final Node node, int arg)获取锁的方法中,如果节点node没有获取到锁,
                // 那么我们会将节点node的前一个节点状态设置为Node.SIGNAL,然后调用parkAndCheckInterrupt方法
                // 将节点node所在线程阻塞。
                // 在这里就是通过unparkSuccessor方法,进而调用LockSupport.unpark(s.thread)方法,唤醒被阻塞的线程
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    <3>tryRelease

     protected final boolean tryRelease(int releases) {
            //判断是不是当前线程持有getExclusiveOwnerThread() == Thread.currentThread(); 如果不是则抛异常 避免别的线程调用此方法释放了锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获取状态--
            int nextc = getState() - releases;
            //判断是否释放完了(可重入) 如果调用2次lock 则需要调用2次unlock才会真正释放锁
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);//置空持有线程
            //设置新的锁状态 这里不用CAS是 读锁 只会有有一个线程持有
            setState(nextc);
            return free;
        }
  • 相关阅读:
    wp8模拟器中使用电脑键盘和模拟器的版本解释
    程序员如何正确的评估自己的薪资
    本地资源之绑定页面的标题和增加软件的语言支持
    C#导出数据的EXCEL模板设计
    程序员高效编程的14点建议
    使用StaticResource给控件定义公共的样式和属性来写界面XAML
    程序员什么时候该考虑辞职
    我的第一个wp8小程序
    检测CPU是否支持虚拟化
    所有经历都是一种恩赐
  • 原文地址:https://www.cnblogs.com/LQBlog/p/15219124.html
Copyright © 2011-2022 走看看