zoukankan      html  css  js  c++  java
  • ReentrantReadWriteLock源码探究

    ReentrantReadWriteLock实现了可重入的读锁和写锁,其中读锁是共享锁,写锁是互斥锁。与ReentrantLock类似,ReentrantReadWriteLock也提供了公平锁和非公平锁两种实现,以满足不同的场景。因此,实际在使用时,会涉及到读锁、写锁、公平锁、非公平锁四个不同的概念,这也使得ReentrantReadWriteLock更加复杂一些。

    1.核心字段与构造器

        private final ReentrantReadWriteLock.ReadLock readerLock;
        private final ReentrantReadWriteLock.WriteLock writerLock;
        final Sync sync;
    
        public ReentrantReadWriteLock() {
            this(false);
        }
    
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    

    ReentrantReadWriteLock通过ReadLockWriteLock两个内部类分别实现读锁和写锁,并且默认的构造器使用非公平锁。Sync这个内部类同样继承了AbstractQueuedSynchronizer(AQS),排队等候的逻辑都交由AQS实现。接下来在讲解加锁和解锁的逻辑之前,需要明确的一点是,AQS会使用state字段记录锁状态,ReentrantReadWriteLock对该字段的使用更加巧妙,使用state的高16位记录读锁持有的总次数,使用低16位记录写锁持有的总次数,这样使用一个字段即可记录两个状态。因此在下文的源码讲解中,会发现读锁重入时,重入次数需要加2^16-1,写锁重入时只需要加1,同时这也意味着读锁和写锁的重入次数的最大值是2^16-1。接下来分别看一下读锁和写锁的加锁逻辑。

    2.读锁的加锁逻辑

        public void lock() {
            sync.acquireShared(1);
        }
        
        //位于AQS
        public final void acquireShared(int arg) {
            //成功获取读锁返回1,否则返回-1
            if (tryAcquireShared(arg) < 0)
                //如果获取读锁失败,就执行下面的逻辑加入等待队列
                doAcquireShared(arg);
        }
        
        //位于Sync
        protected final int tryAcquireShared(int unused) {
    
            Thread current = Thread.currentThread();
            int c = getState();
            //如果写锁已经被占有,并且占有者不是当前线程,则返回-1,即写锁被其他线程占有时不能获取读锁
            //从这里可以看出,线程在持有写锁的情况下,可以继续获取读锁
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            //获取读锁的重入次数
            int r = sharedCount(c);
            //如果读锁不需要阻塞,并且读锁的获取数量没有达到最大值(2^16-1),则尝试将读锁的持有数量加1(注意是在高16位加1,SHARED_UNIT=2^16-1),
            //如果加1操作能够成功,则表示当前线程成功获取读锁
            //注意,公平读锁和非公平读锁的readerShouldBlock()方法逻辑是不一样的
            //非公平读锁在等待队列第一个线程请求写锁时会返回true(即在这种情况下非公平锁需要阻塞),其他情况都返回false
            //公平读锁会查看当前等待队列中是否有其他线程在等待
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                //r == 0表示当前线程是第一个获取读锁的线程,将firstReader指向自己,并初始化firstReaderHoldCount字段
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    //如果firstReader已经指向了自己,就将firstReaderHoldCount加1,表示当前线程作为第一个获取读锁的线程,共获取读锁的次数
                    firstReaderHoldCount++;
                } else {
                    //cachedHoldCounter记录的是最后一个获取读锁的线程
                    //使用cachedHoldCounter可以节省在ThreadLocal中的查找操作
                    HoldCounter rh = cachedHoldCounter;
                    //如果cachedHoldCounter还没初始化,或者最后一个获取读锁的线程不是当前线程,就从ThreadLocal中查看当前线程对应的HoldCounter
                    //注意,如果无法在ThreadLocal中查到当前线程的记录,那么就会新建一个HoldCounter加入ThreadLocalMap中,对应的count字段初始化为0
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    //执行到这里,说明rh != null,并且当前线程是最后一个获取读锁的线程,此时更新ThreadLocalMap中的value值
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    //将读锁的持有次数加1
                    rh.count++;
                }
                //获取读锁成功,则返回1
                return 1;
            }
            //执行到这里有三种情况:1)readerShouldBlock()返回true,即读锁需要阻塞;2)读锁重入次数已经达到最大值;3)更新state字段失败
            //无论哪种情况,都说明当前线程没有成功获取读锁
            return fullTryAcquireShared(current);
        }
    
        //读写锁使用state字段的低16位表示写锁,高16位表示读锁
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
        //获取读锁被持有的次数,数量由state的高16位表示
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        
        //如果写锁被其他线程占有,或者当前获取读锁的线程需要阻塞,就返回-1,如果获取读锁成功则返回1,其他情况会继续自旋
        //上面的注释中讲过,执行到fullTryAcquireShared有三种情况,该方法在自旋中分别处理这三种情况
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            //自旋
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    //如果写锁已经被其他线程占有,则返回-1
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) {
                    //处理读锁需要阻塞的情况
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            //获取最后一个获取锁的线程
                            rh = cachedHoldCounter;
                            //如果当前线程不是最后一个获取锁的线程,则从ThreadLocalMap中取出HoldCount对象
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                //当前线程没有获取读锁,就将ThreadLocalMap中存的HoldCount清理掉
                                //因为根据上面的代码逻辑,走到这里的时候,说明当前获取读锁的线程应该阻塞,
                                //即无法获取读锁,这种情况下需要清理ThreadLocalMap中的记录
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                //读锁获取数量达到最大,抛出异常,这段代码说明读锁的最大重入次数是2^16-1
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //如果成功更新state的值,则表示读锁获取成功,否则会继续自旋
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    //如果当前线程是第一个持有读锁的线程,就设置firstReader字段
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            //从ThreadLocal中查找当前线程对应的HoldCounter对象
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        //更新当前线程持有锁的数量
                        rh.count++;
                        //将cachedHoldCounter指向当前线程的HoldCounter对象
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }
    

    tryAcquireShared()方法用到了ThreadLocal来记录当前线程获取的读锁数量,有兴趣的话可以参考ThreadLocal源码探究 (JDK 1.8)了解ThreadLocal的实现细节。公平锁和非公平锁以不同的方式实现了readerShouldBlock()方法,接下来分别讲解公平锁和非公平锁的实现。

    • 非公平锁的readerShouldBlock()实现
        //由名字可以看出,这个方法主要是判断读锁是否应该阻塞
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
        
        //如果等待队列的第一个线程请求互斥锁(即写锁),则返回true,表示应该阻塞当前的读锁
        //由readerShouldBlock()方法的注释了解到,非公平的读锁在发现等待队列头节点请求互斥锁时,
        //需要进行阻塞而不是抢锁,是为了避免读请求太多的情况下造成写锁线程饥饿
        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            //head != null表示等待队列已初始化,
            //h.next != null && !s.isShared()表示队列的头结点请求的是互斥锁
            //s.thread != null表示头结点对应的线程还没有开始执行
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }
    

    对于非公平锁的读锁来说,在发现等待队列中的第一个线程请求写锁时,会主动取消抢锁,是为了避免请求写锁的线程饥饿,这是与ReentrantLock中的非公平锁一个很大的不同。与非公平锁不同的是,公平锁会先查看队列中是否有其他线程在等待。

    • 公平锁的readerShouldBlock()实现
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            //只要队列中有其他节点在等候,公平锁就要求当前线程排队等待
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    至此tryAcquireShared()方法的逻辑已经介绍完了,在该方法返回-1时,表示当前无法获取读锁,就会接着执行doAcquireShared()方法,来看看该方法的源码:

        private void doAcquireShared(int arg) {
            //将当前线程构造成节点,放到等待队列的末尾
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                //中断标志
                boolean interrupted = false;
                for (;;) {
                    //获取当前线程的前一个节点
                    final Node p = node.predecessor();
                    //只有在当前节点是队列中的第一个有效节点时,才会执行下面的语句
                    if (p == head) {
                        //尝试获取读锁,获取成功则r>0,失败则r<0
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            //如果线程设置了中断标记,则将当前线程中断
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    //当前线程不是队列第一个有效节点,或者获取读锁失败,就阻塞等待
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //线程中断之后,设置interrupted=true,之后代码逻辑会自旋一次,会在for循环的第一个条件语句中用到该字段
                        interrupted = true;
                }
            } finally {
                //如果线程被中断,或者出现异常时,failed=true,需要通过cancelAcquire()方法放弃获取锁
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    doAcquireShared()方法的逻辑是,查看当前线程对应的节点的前置节点是不是头结点(即当前节点是头结点的下一个节点),如果是就尝试获取读锁,获取成功的话就将自己设置成头结点,然后根据中断标志进行处理;如果前置节点不是头结点,则进入shouldParkAfterFailedAcquire(),如果其前置节点是SIGNAL状态,就会在parkAndCheckInterrupt()方法中将当前线程挂起。doAcquireShared()与同在AQS中的另一个方法doAcquireSharedInterruptibly()非常相似,在CountDownLatch源码探究 (JDK 1.8)doAcquireSharedInterruptibly()有详细的解释,包括shouldParkAfterFailedAcquire()parkAndCheckInterrupt()这两个方法,有兴趣的话可以参考,本文不再讨论这些方法的细节。

    3.写锁的加锁逻辑

    介绍完读锁的加锁逻辑之后,接下来看看写锁加锁的实现原理:

        public void lock() {
            sync.acquire(1);
        }
        
        //加锁的代码与ReentrantLock一样,复用了AQS中的处理框架
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        
        //ReentrantReadWriteLock的内部类Sync重写了tryAcquire()方法
        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                //①如果w = 0,但是c!=0,说明读锁已结被占有,直接返回false表示获取写锁失败,说明即使线程自己持有读锁,也不能再次获取写锁
                //②如果w != 0,说明写锁已经被占有,需要判断是不是当前线程占有写锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //判断写锁的持有次数有没有超限,从这里可以知道写锁的最大重入次数是2^16-1
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //更新持有的写锁数量,这里可以看出写锁是可重入的
                setState(c + acquires);
                return true;
            }
            //走到这里,说明c=0,即当前读锁和写锁都没有被占有,公平锁会先检查队列中有没有其他线程在等待锁,非公平锁不会阻塞
            //如果writerShouldBlock()反复返回false,才会考虑设置state字段,设置成功表示成功获取写锁,否则返回false表示获取写锁失败
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            //记录当前线程持有写锁
            setExclusiveOwnerThread(current);
            return true;
        }
    

    公平写锁和非公平写锁同样都重写了writerShouldBlock()方法,非公平写锁的实现非常简单,直接返回false,表示非公平的写锁不需要阻塞;公平写锁会检查等待队列中是否有其他线程在等待获取锁,两种实现方式的源码如下:

        //公平写锁的实现
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        
        //非公平写锁的实现
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
    

    从上面的分析可以看出,线程在持有写锁的情况下,允许同时获取读锁,但是在线程持有读锁的情况下,不允许同时获取写锁,这个细节需要注意。

    4.释放锁

    • 读锁释放锁
        public void unlock() {
            sync.releaseShared(1);
        }
    
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
        
        //当state=0时表示读锁已完全释放,才会返回true,其他情况返回false
        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            //判断当前线程是否是第一个获取读锁的线程
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                //firstReaderHoldCount=1,说明读锁只被当前线程占有1次,释放之后更新firstReader的值
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                //如果当前线程多次持有读锁,则将计数减1
                else
                    firstReaderHoldCount--;
            } else {
                //执行到这里,说明当前线程不是第一个获取读锁的线程
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    //如果当前线程获取的读锁次数<=1,在释放锁的时候,需要清除ThreadLocal中的记录
                    readHolds.remove();
                    //没有持有读锁的线程释放读锁会报错
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                //将当前线程读锁的重入次数减1
                --rh.count;
            }
            for (;;) {
                int c = getState();
                //更新读锁的值
                int nextc = c - SHARED_UNIT;
                //如果这一步CAS操作失败,就会自旋尝试,直到成功为止
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    //nextc=0,说明读锁已释放,返回true,否则返回false
                    //由于读锁是共享锁,可以有多个线程同时获取读锁,只有最后一个持有读锁的线程完全释放读锁,才会返回true
                    return nextc == 0;
            }
        }
    
        private void doReleaseShared() {
    
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        //头结点是SIGNAL状态时,将其状态更新成0,该操作会一直自旋重试,直到修改成功,成功之后会唤醒后面的等待线程
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    //如果头结点不是SIGNAL状态,就自旋将其更新为PROPAGATE状态
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                //h==head说明,从for循环开始到现在头结点没用发生变化
                //注意:当线程释放锁的时候,会修改头结点
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
        private void unparkSuccessor(Node node) {
    
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            //waitStatus>0只有CANCELLED状态,代表节点放弃获取锁
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从队列尾部开始向前查找,目的是寻找node节点后第一个非CANCELLED状态的节点,并将s指向该节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //唤醒s节点对应的线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    由于读锁是共享锁,并且是可重入锁,因此在最后一个持有读锁的线程最后一次释放读锁时,读锁才能真正被释放,此时才会通过doReleaseShared()方法唤醒队列中的等待线程。

    • 写锁释放锁
        public void unlock() {
            sync.release(1);
        }
    
        public final boolean release(int arg) {
            //如果线程是释放写锁成功,则唤醒后面的等待线程
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        protected final boolean tryRelease(int releases) {
            //线程没有持有写锁不允许释放写锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            //释放写锁成功,则设置exclusiveOwnerThread=null,表示写锁目前没有被任何线程占有
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
        
        //判断当前线程是否持有写锁
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    

    5.其他介绍

    • HoldCounter
      HoldCounter是用来记录线程持有读锁的数量,源码中使用cachedHoldCounter来记录最后一个获取读锁的是哪个线程,由于代码很简单,因此前文并未对其进行介绍,这里做一下简单讲解:
        /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter {
            //记录线程持有读锁的次数
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            //记录线程id
            final long tid = getThreadId(Thread.currentThread());
        }
    

    HoldCounter配合使用的是ThreadLocalHoldCounter类,使用readHolds字段维持对该类的引用,下面是ThreadLocalHoldCounter的源码:

        /**
         * ThreadLocal subclass. Easiest to explicitly define for sake
         * of deserialization mechanics.
         */
         //继承了ThreadLocal
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    
  • 相关阅读:
    python3.6.3中html页面转化成pdf
    python3 基于zabbix 自动抓取监控图片
    AWS EC2 云服务器 Red Hat Enterprise Linux Server release 7.4 (Maipo) vnc远程连接教程
    python处理两个json根据序号进行一对一组合
    CentOS6.9 内核升级(2.6.32-696.16.1升级到4.4.101-1.el6.el)
    给List分页
    Quartz的学习记录
    Uri
    Android studio 随笔
    android 随笔之 GridLayout
  • 原文地址:https://www.cnblogs.com/NaLanZiYi-LinEr/p/12546750.html
Copyright © 2011-2022 走看看