zoukankan      html  css  js  c++  java
  • ReentrantReadWriteLock 源码分析

    ReentrantReadWriteLock

    1)获取顺序:
    非公平模式(默认):连续竞争的非公平锁可能无限期地推迟一个或多个 reader 或 writer 线程,但吞吐量通常要高于公平锁。
    公平模式:当某个线程释放当前保持的锁时,可以为等待时间最长的单个 writer 线程分配写入锁,如果有一组等待时间大于所有正在等待的 writer 线程的 reader 线程,则将为该组分配读取锁。
    2)重入:此锁允许 reader 和 writer 按照 ReentrantLock 的样式重新获取读取锁或写入锁。
    3)锁降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
    4)锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。
    5)同步状态值的高 16 位保存读取锁被持有的次数,低 16 位保存写入锁被持有的次数。
    

    创建实例

        /** 内部类实现的读锁 */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** 内部类实现的写锁 */
        private final ReentrantReadWriteLock.WriteLock writerLock;
        /** 实现锁操作的同步器 */
        final Sync sync;
    
        /**
         * 创建一个非公平的读写锁实例
         */
        public ReentrantReadWriteLock() {
            this(false);
        }
    
        /**
         * 1)fair=true,创建一个公平的读写锁实例。
         * 2)fair=false,创建一个非公平的读写锁实例。
         */
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            // 基于当期实例创建读锁和写锁
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    

    读锁获取:ReadLock#lock

            /**
             * 1)如果写锁没有被其他线程持有,则成功获取读锁并返回
             * 2)写锁被其他线程持有,则进入阻塞状态。
             */
            @Override
            public void lock() {
                sync.acquireShared(1);
            }
    
    AbstractQueuedSynchronizer#acquireShared
        /**
         * 在共享模式下获取锁,忽略线程中断。
         */
        public final void acquireShared(int arg) {
            // 尝试获取共享锁
            if (tryAcquireShared(arg) < 0) {
                // 再次获取共享锁
                doAcquireShared(arg);
            }
        }
    
    ReentrantReadWriteLock#Sync
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;
    
            /**
             * 高 16 位记录写锁持有次数,低 16 位记录读锁持有次数
             */
            static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = 1 << Sync.SHARED_SHIFT;
            static final int MAX_COUNT      = (1 << Sync.SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << Sync.SHARED_SHIFT) - 1;
    
            /** 读锁被持有的计数值 */
            static int sharedCount(int c)    { return c >>> Sync.SHARED_SHIFT; }
            /** 写锁被持有的计数值 */
            static int exclusiveCount(int c) { return c & Sync.EXCLUSIVE_MASK; }
    
            /**
             * 每个线程的读锁保持计数器
             */
            static final class HoldCounter {
                int count;          // initially 0
                // Use id, not reference, to avoid garbage retention
                final long tid = LockSupport.getThreadId(Thread.currentThread());
            }
    
            /**
             * 持有读锁计数器的线程局部对象
             */
            static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
                @Override
                public HoldCounter initialValue() {
                    return new HoldCounter();
                }
            }
    
            /**
             * 当前线程持有的读锁计数器对象,在创建时初始化,当读锁释放时移除
             */
            private transient ThreadLocalHoldCounter readHolds;
    
            /**
             * 最近一个成功获取读锁的线程的读锁持有计数器
             */
            private transient HoldCounter cachedHoldCounter;
    
            /**
             * 第一个获取读锁的线程
             */
            private transient Thread firstReader;
            /**
             * 第一个获取读锁的线程持有读锁的计数值
             */
            private transient int firstReaderHoldCount;
    
            Sync() {
                readHolds = new ThreadLocalHoldCounter();
                setState(getState()); // ensures visibility of readHolds
            }
    
    
        /**
         * 非公平同步器
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            @Override
            boolean writerShouldBlock() {
                return false; // writers can always barge
            }
            @Override
            boolean readerShouldBlock() {
                // 避免获取写锁的线程饥饿
                return apparentlyFirstQueuedIsExclusive();
            }
        }
    
    AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive
        /**
         * 同步队列中第一个等待获取锁的线程,需要获取独占的写锁,则返回 true
         */
        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            // 头结点、第二个节点都不为 null,节点处于独占模式,并且节点上有驻留线程,表示有线程在等待获取写锁。
            return (h = head) != null &&
                    (s = h.next)  != null &&
                    !s.isShared()         &&
                    s.thread != null;
        }
    
    ReentrantReadWriteLock#Sync#tryAcquireShared
            @Override
            @ReservedStackAccess
            protected final int tryAcquireShared(int unused) {
                // 读取当前线程
                final Thread current = Thread.currentThread();
                // 读取同步状态
                final int c = getState();
                /**
                 * 1)如果写锁已经被线程持有,并且不是当前线程,则获取失败,返回 -1。
                 */
                if (Sync.exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current) {
                    return -1;
                }
                /**
                 * 2)写锁未被任何线程持有,或写锁被当前线程持有。
                 */
                // 读取读锁计数值
                final int r = Sync.sharedCount(c);
                /**
                 * 获取读锁的线程是否应该被阻塞【同步队列第一个阻塞线程在等待获取写锁】
                 * && 读锁的占用计数值 < 65535
                 * && 比较设置读锁的新计数值
                 */
                if (!readerShouldBlock() &&
                        r < Sync.MAX_COUNT &&
                        compareAndSetState(c, c + Sync.SHARED_UNIT)) {
                    // 1)如果是第一个获取读锁的线程
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                        // 2)当前线程是第一个获取读锁的线程,并在重复获取读锁
                    } else if (firstReader == current) {
                        // 累积读锁持有计数值
                        firstReaderHoldCount++;
                        // 3)当前线程不是第一个获取读锁的线程
                    } else {
                        // 读取最近一个成功获取读锁的线程的读锁持有计数器
                        HoldCounter rh = cachedHoldCounter;
                        // 如果最近获取读锁的线程不是当前线程
                        if (rh == null ||
                                rh.tid != LockSupport.getThreadId(current)) {
                            // 获取当前线程的持有计数器
                            cachedHoldCounter = rh = readHolds.get();
                        } else if (rh.count == 0) {
                            readHolds.set(rh);
                        }
                        // 递增计数值
                        rh.count++;
                    }
                    return 1;
                }
                // 写锁已经被当前线程持有,正在获取读锁
                return fullTryAcquireShared(current);
            }
    
            /**
             * Full version of acquire for reads, that handles CAS misses
             * and reentrant reads not dealt with in tryAcquireShared.
             */
            final int fullTryAcquireShared(Thread current) {
                HoldCounter rh = null;
                for (;;) {
                    // 读取同步状态
                    final int c = getState();
                    // 1)有线程持有写锁
                    if (Sync.exclusiveCount(c) != 0) {
                        // 如果不是当前线程持有,则返回 -1
                        if (getExclusiveOwnerThread() != current)
                        {
                            return -1;
                            // else we hold the exclusive lock; blocking here
                            // would cause deadlock.
                        }
                    // 2)写锁没有被任何线程持有
                    } else if (readerShouldBlock()) {
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null ||
                                        rh.tid != LockSupport.getThreadId(current)) {
                                    rh = readHolds.get();
                                    // 读锁持有计数值为 0,则移除计数器
                                    if (rh.count == 0) {
                                        readHolds.remove();
                                    }
                                }
                            }
                            if (rh.count == 0) {
                                return -1;
                            }
                        }
                    }
                    // 读锁的获取计数值已经为最大值
                    if (Sync.sharedCount(c) == Sync.MAX_COUNT) {
                        throw new Error("Maximum lock count exceeded");
                    }
                    // 比较更新读锁的计数值
                    if (compareAndSetState(c, c + Sync.SHARED_UNIT)) {
                        // 1)当前线程是第一个获取读锁的线程
                        if (Sync.sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        // 2)当前线程是第一个获取读锁的线程,重复获取   
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null) {
                                rh = cachedHoldCounter;
                            }
                            if (rh == null ||
                                    rh.tid != LockSupport.getThreadId(current)) {
                                rh = readHolds.get();
                            } else if (rh.count == 0) {
                                readHolds.set(rh);
                            }
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    
    AbstractQueuedSynchronizer#doAcquireShared
        /**
         * 在共享模式下获取锁,线程能响应中断
         */
        private void doAcquireShared(int arg) {
            // 创建一个共享节点
            final Node node = addWaiter(Node.SHARED);
            boolean interrupted = false;
            try {
                for (;;) {
                    // 读取前置节点
                    final Node p = node.predecessor();
                    // 前置节点是头节点
                    if (p == head) {
                        // 尝试获取锁
                        final int r = tryAcquireShared(arg);
                        /**
                         * 第一个等待获取写锁的线程已经成功获取写锁,并且已经使用完毕而释放,
                         * 当前线程成功获取读锁。
                         */
                        if (r >= 0) {
                            // 设置当前节点为新的头节点,并尝试将信号往后传播,唤醒等待获取读锁的线程
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            return;
                        }
                    }
                    // 当前节点的驻留线程需要被阻塞,则阻塞当前线程
                    if (AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(p, node)) {
                        interrupted |= parkAndCheckInterrupt();
                    }
                }
            } catch (final Throwable t) {
                cancelAcquire(node);
                throw t;
            } finally {
                if (interrupted) {
                    AbstractQueuedSynchronizer.selfInterrupt();
                }
            }
        }
    
    AbstractQueuedSynchronizer#setHeadAndPropagate
        /**
         * 设置头节点,如果同步队列中有等待获取读锁的线程,则尝试唤醒
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                // 读取当前节点的后置节点,如果其为 null 或处于共享模式【等待获取读锁】
                final Node s = node.next;
                if (s == null || s.isShared()) {
                    // 尝试释放后继节点
                    doReleaseShared();
                }
            }
        }
    
    AbstractQueuedSynchronizer#doReleaseShared
        /**
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            for (;;) {
                // 读取头节点
                final Node h = head;
                // 头结点和尾节点不相等,表示有线程在等待获取锁
                if (h != null && h != tail) {
                    // 读取头节点的同步状态
                    final int ws = h.waitStatus;
                    // 后继节点需要被唤醒
                    if (ws == Node.SIGNAL) {
                        // 比较更新同步状态
                        if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        {
                            // 更新失败则再次确认
                            continue;            // loop to recheck cases
                        }
                        // 同步状态成功更新为 0,则唤醒后继节点的驻留线程
                        unparkSuccessor(h);
                    }
                    // 同步状态为 0,并且比较更新为 PROPAGATE 失败,则继续循环
                    else if (ws == 0 &&
                            !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    {
                        continue;                // loop on failed CAS
                    }
                }
                /**
                 * 1)后继节点被唤醒并且还未成功获取到锁,则直接退出循环,此时只唤醒了一个线程。
                 * 2)被唤醒的后继节点成功获取到读锁,驻留线程已经被释放,此时头节点已经改变,则进行重试。
                 */
                if (h == head) {
                    break;
                }
            }
        }
    

    读锁释放:ReadLock#unlock

            /**
             * 释放读锁,如果读锁的持有计数值为 0,则写锁可以被获取。
             */
            @Override
            public void unlock() {
                sync.releaseShared(1);
            }
    
            @Override
            @ReservedStackAccess
            protected final boolean tryReleaseShared(int unused) {
                // 读取当前线程
                final Thread current = Thread.currentThread();
                // 1)当前线程是获取读锁的第一个线程
                if (firstReader == current) {
                    // 读锁持有计数为 1,则释放后为 0,
                    if (firstReaderHoldCount == 1) {
                        firstReader = null;
                    } else {
                        // 读锁被当前线程多次持有,则递减读锁持有计数值
                        firstReaderHoldCount--;
                    }
                // 2)当前线程不是持有读锁的线程
                } else {
                    // 最近持有读锁的线程计数值
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null ||
                            rh.tid != LockSupport.getThreadId(current)) {
                        // 读取当前线程的读锁持计数值对象
                        rh = readHolds.get();
                    }
                    final int count = rh.count;
                    if (count <= 1) {
                        // 如果为 1,则移除线程局部对象
                        readHolds.remove();
                        if (count <= 0) {
                            throw Sync.unmatchedUnlockException();
                        }
                    }
                    // 递减计数值
                    --rh.count;
                }
                // 原子更新同步状态值
                for (;;) {
                    final int c = getState();
                    final int nextc = c - Sync.SHARED_UNIT;
                    if (compareAndSetState(c, nextc)) {
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                    }
                }
            }
    

    写锁获取:WriteLock#lock

    ReentrantReadWriteLock#Sync
            @Override
            @ReservedStackAccess
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // 读取同步状态值
                final int c = getState();
                // 写锁被持有的计数值
                final int w = Sync.exclusiveCount(c);
                // 1)读锁或写锁至少有一个被线程持有
                if (c != 0) {
                    /**
                     * 2)读锁被线程持有,或当前线程不是写锁持有线程。
                     * 目标线程不允许先获取读锁,后获取写锁,即 ReentrantReadWriteLock 不支持锁升级。
                     */
                    if (w == 0 || current != getExclusiveOwnerThread()) {
                        // 尝试获取失败
                        return false;
                    }
                    // 写锁重入次数超出最大值
                    if (w + Sync.exclusiveCount(acquires) > Sync.MAX_COUNT) {
                        throw new Error("Maximum lock count exceeded");
                    }
                    // 更新同步状态,写锁获取成功
                    setState(c + acquires);
                    return true;
                }
                // 读锁和写锁都未被线程持有,则原子更新同步状态
                if (writerShouldBlock() ||
                        !compareAndSetState(c, c + acquires)) {
                    return false;
                }
                // 设置写锁的独占线程为当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
    

    写锁释放:WriteLock#unlock

            /**
             * 释放锁,写锁已经被释放,则返回 true
             */
            @Override
            @ReservedStackAccess
            protected final boolean tryRelease(int releases) {
                // 写锁是否被当前线程持有
                if (!isHeldExclusively()) {
                    throw new IllegalMonitorStateException();
                }
                // 计算同步状态值,写锁计数值保存在低 16 位
                final int nextc = getState() - releases;
                // 新的写锁计数值为 0,则表示读写锁已经自由了
                final boolean free = Sync.exclusiveCount(nextc) == 0;
                if (free) {
                    // 清空独占锁持有线程
                    setExclusiveOwnerThread(null);
                }
                setState(nextc);
                return free;
            }
    
  • 相关阅读:
    去深圳办理港澳通行证签注延期
    預約申領往來港澳通行證及簽注x
    表格选中效果展示
    Purchase购物车实例分析
    IOS开发基础知识--碎片17
    IOS开发基础知识--碎片16
    IOS开发基础知识--碎片15
    IOS开发基础知识--碎片14
    IOS关于LKDBHelper实体对象映射插件运用
    IOS开发基础知识--碎片13
  • 原文地址:https://www.cnblogs.com/zhuxudong/p/10050443.html
Copyright © 2011-2022 走看看