zoukankan      html  css  js  c++  java
  • 读写锁 ReentrantReadWriteLock

    一、读写锁 ReadWriteLock概念特点
    读写锁维护了一对相关的锁,一个用于只读操作,一个用于写入操作。只要没有writer,读取锁可以由多个reader线程同时保持。写入锁是独占的。

    互斥锁【ReetrantLock】一次只允许一个线程访问共享数据,哪怕进行的是只读操作;读写锁【ReadWriteLock】允许对共享数据进行更高级别的并发访问:对于写操作,一次只有一个线程(write线程)可以修改共享数据,对于读操作,允许任意数量的线程同时进行读取。writer可以获取读取锁,但reader不能获取写入锁。写入锁降级为读取锁,实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
    读写锁的读取锁和写入锁都支持锁获取期间的中断。并且写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。读取锁不支持 Condition,readLock().newCondition() 会抛出 UnsupportedOperationException。

    二、实现原理及核心代码

    读写锁也是基于AQS实现。

    AQS以单个 int 类型的原子变量来表示其状态,定义了4个抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前两个方法用于独占/排他模式,后两个用于共享模式 )留给子类实现,用于自定义同步器的行为以实现特定的功能。

    对于 ReentrantLock,它是可重入的独占锁,内部的 Sync 类实现了 tryAcquire(int)、tryRelease(int) 方法,并用状态的值来表示重入次数,加锁或重入锁时状态加 1,释放锁时状态减 1,状态值等于 0 表示锁空闲。

    对于 CountDownLatch,它是一个关卡,在条件满足前阻塞所有等待线程,条件满足后允许所有线程通过。内部类 Sync 把状态初始化为大于 0 的某个值,当状态大于 0 时所有wait线程阻塞,每调用一次 countDown 方法就把状态值减 1,减为 0 时允许所有线程通过。利用了AQS的共享模式。


    【AQS一个状态如何表示多个读锁与单个写锁呢,】一个状态是没法既表示读锁,又表示写锁的,不够用啊,那就辦成两份用了,客家话说一个饭粒咬成两半吃,状态的高位部分表示读锁,低位表示写锁,由于写锁只有一个,所以写锁的重入计数也解决了,这也会导致写锁可重入的次数减小。

    【如何表示每个读锁、写锁的重入次数呢】由于读锁可以同时有多个,肯定不能再用辦成两份用的方法来处理了,但我们有 ThreadLocal,可以把线程重入读锁的次数作为值存在 ThreadLocal 里。AQS 的状态是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数(exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 肯定不会同时不为 0。
    【读、写锁的公平性如何实现】对于公平性的实现,可以通过AQS的等待队列和它的抽象方法来控制,在状态值的另一半里存储当前持有读锁的线程数。如果读线程申请读锁,当前写锁重入次数不为 0 时,则等待,否则可以马上分配;如果是写线程申请写锁,当前状态为 0 则可以马上分配,否则等待。

    abstract static class Sync extends AbstractQueuedSynchronizer {
    
        // 
         // 
           static final int SHARED_SHIFT   = 16;
    
           // 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
           static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    
           // 写锁的可重入的最大次数、读锁允许的最大数量
           static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    
           // 写锁的掩码,用于状态的低16位有效值
           static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
           // 读锁计数,当前持有读锁的线程数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    
        // 写锁的计数,也就是它的重入次数
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    /**
         * 每个线程特定的 read 持有计数。存放在ThreadLocal,不需要是线程安全的。
         */
        static final class HoldCounter {
            int count = 0;
    
            // 使用id而不是引用是为了避免保留垃圾。注意这是个常量。
            final long tid = Thread.currentThread().getId();
        }
    
        /**
         * 采用继承是为了重写 initialValue 方法,这样就不用进行这样的处理:
         * 如果ThreadLocal没有当前线程的计数,则new一个,再放进ThreadLocal里。
         * 可以直接调用 get。
         * */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    
        /**
         * 保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。
         */
        private transient ThreadLocalHoldCounter readHolds;
    
        /**
         * 最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找,
         * 通常情况下,下一个释放线程是最后一个获取线程。这不是 volatile 的,
         * 因为它仅用于试探的,线程进行缓存也是可以的
         * (因为判断是否是当前线程是通过线程id来比较的)。
         */
        private transient HoldCounter cachedHoldCounter;
    
        /**
         * firstReader是这样一个特殊线程:它是最后一个把 共享计数 从 0 改为 1 的
         * (在锁空闲的时候),而且从那之后还没有释放读锁的。如果不存在则为null。
         * firstReaderHoldCount 是 firstReader 的重入计数。
         *
         * firstReader 不能导致保留垃圾,因此在 tryReleaseShared 里设置为null,
         * 除非线程异常终止,没有释放读锁。
         *
         * 作用是在跟踪无竞争的读锁计数时非常便宜。
         *
         * firstReader及其计数firstReaderHoldCount是不会放入 readHolds 的。
         */
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
    
        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
        }
    }
    
    //写锁的获取与释放
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) { // 状态不为0,表示锁被分配出去了。
    
            // (Note: if c != 0 and w == 0 then shared count != 0)
          // c != 0 and w == 0 表示分配了读锁
          // w != 0 && current != getExclusiveOwnerThread() 表示其他线程获取了写锁。
            if (w == 0 || current != getExclusiveOwnerThread())
                return false ;
    
            // 写锁重入
            // 检测是否超过最大重入次数。
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
    
            // 更新写锁重入次数,写锁在低位,直接加上 acquire 即可。
            // Reentrant acquire
            setState(c + acquires);
            return true ;
        }
    
        // writerShouldBlock 留给子类实现,用于实现公平性策略。
        // 如果允许获取写锁,则用 CAS 更新状态。
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false ; // 不允许获取锁 或 CAS 失败。
    
        // 获取写锁超过,设置独占线程。
        setExclusiveOwnerThread(current);
        return true;
    }
    
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively()) // 是否是当前线程持有写锁
            throw new IllegalMonitorStateException();
    
        // 这里不考虑高16位是因为高16位肯定是 0。
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread( null); // 写锁完全释放,设置独占线程为null。
        setState(nextc);
        return free;
    }
    
    //读锁的获取与释放
    // 参数变为 unused 是因为读锁的重入计数是内部维护的。
    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
    
        // 这个if语句是说:持有写锁的线程可以获取读锁。
        if (exclusiveCount(c) != 0 && // 已分配了写锁
            getExclusiveOwnerThread() != current) // 且当前线程不是持有写锁的线程
            return -1;
    
        int r = sharedCount(c); // 取读锁计数
        if (!readerShouldBlock() && // 由子类根据其公平策略决定是否允许获取读锁
            r < MAX_COUNT &&           // 读锁数量还没达到最大值
    
            // 尝试获取读锁。注意读线程计数的单位是  2^16
            compareAndSetState(c, c + SHARED_UNIT)) {
             // 成功获取读锁
    
         // 注意下面对firstReader的处理:firstReader是不会放到readHolds里的
         // 这样,在读锁只有一个的情况下,就避免了查找readHolds。
            if (r == 0) { // 是 firstReader,计数不会放入  readHolds。
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) { // firstReader 重入
                firstReaderHoldCount++;
            } else {
                 // 非 firstReader 读锁重入计数更新
                HoldCounter rh = cachedHoldCounter; // 首先访问缓存
                if (rh == null || rh.tid != current.getId())
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        // 获取读锁失败,放到循环里重试。
        return fullTryAcquireShared(current);
    }
    
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
               // 写锁被分配,非写锁线程获取读锁,失败
                    return -1;
                // 否则,当前线程持有写锁,在这里阻塞将导致死锁。
    
            } else if (readerShouldBlock()) {
                // 写锁空闲  且  公平策略决定 线程应当被阻塞
                // 下面的处理是说,如果是已获取读锁的线程重入读锁时,
                // 即使公平策略指示应当阻塞也不会阻塞。
                // 否则,这也会导致死锁的。
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId()) {
                            rh = readHolds.get();
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    // 需要阻塞且是非重入(还未获取读锁的),获取失败。
                    if (rh.count == 0)
                        return -1;
                }
            }
    
            // 写锁空闲  且  公平策略决定线程可以获取读锁
            if (sharedCount(c) == MAX_COUNT) // 读锁数量达到最多
                throw new Error( "Maximum lock count exceeded");
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 申请读锁成功,下面的处理跟tryAcquireShared是类似的。
    
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
               // 设定最后一次获取读锁的缓存
                    if (rh == null)
                        rh = cachedHoldCounter;
    
                    if (rh == null || rh.tid != current.getId())
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
    
                    cachedHoldCounter = rh; // 缓存起来用于释放
                }
                return 1;
            }
        }
    }
    
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        // 清理firstReader缓存 或 readHolds里的重入计数
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                // 完全释放读锁
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count; // 主要用于重入退出
        }
        // 循环在CAS更新状态值,主要是把读锁数量减 1
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // 释放读锁对其他读线程没有任何影响,
                // 但可以允许等待的写线程继续,如果读锁、写锁都空闲。
                return nextc == 0;
        }
    }

    https://coderbee.net/index.php/concurrent/20140214/792/comment-page-1#comment-5314

    Java 读写锁 ReentrantReadWriteLock 源码分析

  • 相关阅读:
    06 is和==的区别 encode()编码 decode()解码
    05 dic的增删改查 字典的嵌套 考试题dic.get()的相关使用
    03 编码 int ,bool,str的常用操作 主要讲str
    01 基本数据类型 变量 if语句
    04 列表的增删改查 常用方法 元祖 range
    02 while循环 格式化输出 运算符
    多校2 Harmonious Army hdu6598 网络流
    P3159 [CQOI2012]交换棋子 网络流
    P2172 [国家集训队]部落战争 最大流
    P2402 奶牛隐藏 网络流
  • 原文地址:https://www.cnblogs.com/doit8791/p/9164151.html
Copyright © 2011-2022 走看看