zoukankan      html  css  js  c++  java
  • ReentrantReadWriteLock的原理

    1、前言
    在《从源码分析ReentrantLock原理》这一篇文章中分析了以非阻塞同步算法为基础实现的可重入独占锁ReentrantLock。所谓** “独占” 即同一时间只能有一个线程持有锁。而 “重入” 是指该线程如果持有锁,可以在同步代码块内再次请求占有锁而不被阻塞,线程重入后将AQS内部状态state同步加1继续同步区的操作。但是要注意该线程要想移交锁的控制权必须完全释放重入锁,即将AQS的state同步更新到0为止。ReentrantReadWriteLock出现的目的就是针对ReentrantLock独占带来的性能问题,使用ReentrantLock无论是“写/写”线程、“读/读”线程、“读/写”线程之间的工作都是互斥,同时只有一个线程能进入同步区域。然而大多实际场景是“读/读”线程间并不存在互斥关系,只有"读/写"线程或"写/写"线程间的操作需要互斥的。因此引入ReentrantReadWriteLock,它的特性是:一个资源可以被多个读操作访问,或者一个写操作访问,但两者不能同时进行,从而提高读操作的吞吐量。

    2、读写锁简介
    现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了。针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁,描述如下:
    线程进入读锁的前提条件:

    • 没有其他线程的写锁,
    • 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。

    线程进入写锁的前提条件:

    • 没有其他线程的读锁
    • 没有其他线程的写锁

    而读写锁有以下三个重要的特性:

    • (1)公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
    • (2)重进入:读锁和写锁都支持线程重进入。
    • (3)锁降级:遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。

    3、使用场景及代码

    Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象。两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象。
    读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,我们只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!
    在多线程的环境下,对同一份数据进行读写,会涉及到线程安全的问题。比如在一个线程读取数据的时候,另外一个线程在写数据,而导致前后数据的不一致性;一个线程在写数据的时候,另一个线程也在写,同样也会导致线程前后看到的数据的不一致性。这时候可以在读写方法中加入互斥锁,任何时候只能允许一个线程的一个读或写操作,而不允许其他线程的读或写操作,这样是可以解决这样以上的问题,但是效率却大打折扣了。因为在真实的业务场景中,一份数据,读取数据的操作次数通常高于写入数据的操作,而线程与线程间的读读操作是不涉及到线程安全的问题,没有必要加入互斥锁,只要在读-写,写-写期间上锁就行了。对于以上这种情况,读写锁是最好的解决方案!其中它的实现类:ReentrantReadWriteLock--顾名思义是可重入的读写锁,允许多个读线程获得ReadLock,但只允许一个写线程获得WriteLock。
    读写锁的机制:

    • "读-读"不互斥
    • "读-写"互斥
    • "写-写"互斥

    需要提前了解的概念:

    • 锁降级:从写锁变成读锁;
    • 锁升级:从读锁变成写锁。

    读锁是可以被多线程共享的,写锁是单线程独占的。也就是说写锁的并发限制比读锁高,这可能就是升级/降级名称的来源。真正趋于实际生产环境中的缓存案例:

    package com.springboot.study.tests;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /**
     * @Author: guodong
     * @Date: 2021/8/31 15:46
     * @Version: 1.0
     * @Description:
     */
    public class CacheDemo {
    
        /**
         * 缓存器,这里假设需要存储1000左右个缓存对象,按照默认的负载因子0.75,则容量=750,大概估计每一个节点链表长度为5个
         * 那么数组长度大概为:150,又有雨设置map大小一般为2的指数,则最近的数字为:128
         */
        private Map<String, Object> map = new HashMap<>(128);
        private ReadWriteLock rwl = new ReentrantReadWriteLock();
    
        public static void main(String[] args) {
        }
    
        public Object get(String id){
            Object value = null;
            rwl.readLock().lock();//首先开启读锁,从缓存中去取
            try{
                value = map.get(id);
                //如果缓存中没有释放读锁,上写锁
                if(value == null){
                    rwl.readLock().unlock();
                    rwl.writeLock().lock();
                    try{
                        //防止多写线程重复查询赋值
                        if(value == null){
                            //此时可以去数据库中查找,这里简单的模拟一下
                            value = "redis-value";
                        }
                        rwl.readLock().lock(); //加读锁降级写锁,不明白的可以查看上面锁降级的原理与保持读取数据原子性的讲解
                    }finally{
                        rwl.writeLock().unlock(); //释放写锁
                    }
                }
            }finally{
                rwl.readLock().unlock(); //最后释放读锁
            }
            return value;
        }
    
    }

    4、源码解析

    ReadLock和WriteLock是ReentrantReadWriteLock的两个内部类,Lock的上锁和释放锁都是通过AQS来实现的。

    AQS定义了独占模式的acquire()和release()方法,共享模式的acquireShared()和releaseShared()方法.还定义了抽象方法tryAcquire()、tryAcquiredShared()、tryRelease()和tryReleaseShared()由子类实现,tryAcquire()和tryAcquiredShared()分别对应独占模式和共享模式下的锁的尝试获取,就是通过这两个方法来实现公平性和非公平性,在尝试获取中,如果新来的线程必须先入队才能获取锁就是公平的,否则就是非公平的。这里可以看出AQS定义整体的同步器框架,具体实现放手交由子类实现。

    public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
        private static final long serialVersionUID = -6992448646407690164L;
        /** 提供读锁的内部类 */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** 提供写锁的内部类 */
        private final ReentrantReadWriteLock.WriteLock writerLock;
        /** 执行所有同步机制 */
        final Sync sync;
    
    }

    state
    之前在阅读 ReentrantLock 源码的时候 state 代表了锁的状态,0 表示没有线程持有锁,大于 1 表示已经有线程持有锁及其重入的次数。而在 ReentrantReadWriteLock 是读写锁,那就需要保存读锁和写锁两种状态的,那是怎么样表示的呢?在 ReentrantReadWriteLock 中同样存在一个 Sync 继承了 AbstractQueuedSynchronizer,也是 FairSync、NonfairSync 的父类。内部定义了 state 的一些操作。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
        // 移位数
        static final int SHARED_SHIFT   = 16;
        // 单位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 最大数量 1 << 16 -> 65536
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 计算独占数使用 1 << 16 -> 65536
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
        // 返回共享保留数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        // 返回独占保留数 
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    }

    在 AQS 中定义 state 为 int 类型,而在 ReentrantReadWriteLock 中,将 state 的 高 16 位和低 16 位拆开表示读写锁。其中高 16 位表示读锁,低 16 位表示写锁。分别使用 sharedCount 和 exclusiveCount 方法获取读锁和写锁的当前状态。

    下面分别从读锁和写锁的角度来看如何进行加锁和释放锁的?
    ReadLock.lock

    public static class ReadLock 
        implements Lock, java.io.Serializable {
        /**
         * 获取读取锁。
         * 如果写锁没有被另一个线程持有,则获取读锁并立即返回。
         * 如果写锁由另一个线程持有,则出于线程调度目的,
         * 当前线程将被禁用,并处于休眠状态,直到获取读锁为止。
         */
        public void lock() {
            // 调用 AQS 获取共享资源
            sync.acquireShared(1);
        }
    }

    获取共享资源,这块使用的 AQS 的逻辑,其中 tryAcquireShared(arg) 是在 ReentrantReadWriteLock.Sync 中实现的。并且 AQS 中有规定,tryAcquireShared 分为三种返回值:

    • 小于 0: 表示失败;
    • 等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
    • 大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final int tryAcquireShared(int unused) {
         
            Thread current = Thread.currentThread();
            // 获取 state 值
            int c = getState();
            // 独占计数不为 0 且 不是当前线程, 说明已经有写锁
            if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
                return -1;
            // 获取共享计数(读锁计数)
            int r = sharedCount(c);
            // 不需要阻塞读锁 && 共享计数小于最大值 && state 更新成功
            if (!readerShouldBlock() && r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    // 当前读锁计数为 0
                    // firstReader是获得读锁的第一个线程
                    // firstReaderHoldCount是firstReader的保持计数
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    // 读锁重入
                    firstReaderHoldCount++;
                } else {
                    // 当前缓存计数
                    HoldCounter rh = cachedHoldCounter;
                    // 当前线程没有计数 或者 没有创建计数器
                    if (rh == null || rh.tid != getThreadId(current))
                        // 创建计数,基于 ThreadLocal
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0) 
                        readHolds.set(rh);
                    // 计数累加
                    rh.count++;
                }
                return 1;
            }
            // 完整地获取共享锁方法,作为tryAcquireShared方法因CAS获取锁失败后的处理。
            // 因为前面可能失败 CAS 失败, 队列策略失败等原因。
            return fullTryAcquireShared(current);
        }
    }
    • 先获取 state ,通过 exclusiveCount 方法获取到写锁的计数值,不为 0 且 不是当前线程, 说明已经有写锁。返回 -1 失败。
    • 通过 sharedCount 获取读锁计数,判断是否需要阻塞以及是否超过上限后,使用 CAS 更新 读锁计数。
    • 设置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。
    • 最后会进行完整的获取共享锁方法,作为之前获取失败的后续处理方法。

    firstReader:firstReader是获得读锁的第一个线程;
    firstReaderHoldCount:firstReaderHoldCount是firstReader的保持计数。即获得读锁的第一个线程的重入次数。
    cachedHoldCounter:最后一个获得读锁的线程获得读锁的重入次数。

    final int fullTryAcquireShared(Thread current) {
     
        HoldCounter rh = null;
        // 无限循环
        for (;;) {
            int c = getState();
            // 是否有写锁
            if (exclusiveCount(c) != 0) {
                // 有写锁,但是不是当前线程,直接返回失败
                if (getExclusiveOwnerThread() != current)
                    return -1;
            } else if (readerShouldBlock()) {
                // 需要阻塞
                // 没有写锁,确保没有重新获取读锁
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    // 当前线程的读锁计数 ThreadLocal 中
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            // 计数结束,remove 掉
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    // 为 0 直接失败
                    if (rh.count == 0)
                        return -1;
                }
            }
            // 到达上限 抛出异常
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // CAS 设置读锁
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }
    • 首先会一直循环
    • 有写锁,但是不是当前线程,直接返回失败。但是,有写锁,如果是当前线程,是会继续执行的。
    • 设置或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。

    当存在写锁(独占锁)时,方法会返回 -1 失败,后续会调用 AQS 的 doAcquireShared 方法,循环获取资源。doAcquireShared 方法会不断循环,尝试获取读锁,一旦获取到读锁,当前节点会立即唤醒后续节点,后续节点开始尝试获取读锁,依次传播。

    ReadLock.unlock

    public static class ReadLock 
        implements Lock, java.io.Serializable {
        public void unlock() {
            sync.releaseShared(1);
        }
    }

    调用 AQS 的 releaseShared 释放共享资源方法。

    其中 tryReleaseShared 有 ReadLock 实现。

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            // 第一个线程是当前线程
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            // 第一个线程不是当前线程,更新自己的 ThreadLocal 里面的计数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        // 循环
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            // 使用 CAS 更新 state
            if (compareAndSetState(c, nextc))
                // 但是如果现在读和写锁都已释放,
                // 它可能允许等待的写程序继续进行。
                return nextc == 0;
        }
    }
    • 如果是第一个线程,直接更新技术,不是则更新自己 ThreadLocal 里面保存的计数。
    • 循环,使用 CAS 更新 state 的值。
    • 如果 state 更新后的值为 0,说明没有线程持有读锁或者写锁了。
    • 当 state 为 0,此时会调用 AQS 的 doReleaseShared 方法。此时队列如果有写锁,那就会被写锁获取的锁。

    WriteLock.lock

    public static class WriteLock 
        implements Lock, java.io.Serializable {
        /**
         * 获取写入锁。
         * 如果没有其他线程持有读锁或写锁,会直接返回,并将写锁计数设置为1。
         * 如果当前线程持有写锁,则将写锁计数 +1,然后返回。
         * 如果锁正在被其他线程持有,则当前线程用于线程调度目的,
         * 当前线程将被禁用,并处于休眠状态,直到获取读锁并将写锁计数设置为1。
         */
        public void lock() {
            sync.acquire(1);
        }
    }

    tryAcquire 方法由 Write 自己实现,方式和 ReentrantLock 类似。

    protected final boolean tryAcquire(int acquires) {
        
        // 如果读锁计数为非零或写锁计数为非零,并且所有者是另一个线程,则失败。
        // 如果计数饱和,则失败。只有在count不为零时,才可能发生这种情况。
        // 否则,如果该线程是可重入获取或队列策略允许的话,则有资格进行锁定。
        // 如果是这样,请更新状态并设置所有者。
        Thread current = Thread.currentThread();
        int c = getState();
        // 写锁计数
        int w = exclusiveCount(c);
        // c != 0 说明有有线程获取锁了
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 判断是不是自己,不是自己 返回 false
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            // 判断有没有超过上限
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // 重入
            setState(c + acquires);
            return true;
        }
        // 不需要阻塞,或者 CAS 更新 state 失败
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }
    • 获取 state , 如果 state 不为 0 则判断是否为当前线程重入获取。
    • state 为 0 ,则当前线程 CAS 更新 state,获取锁。
    • 更新成功之后绑定当前线程。
    • 如果失败会继续调用 AQS 的 acquireQueued,将当前阻塞放在 AQS 队列中。AQS 会不断循环,等待上一个锁释放后,尝试获得锁。

    WriteLock.unlock

    public static class WriteLock 
        implements Lock, java.io.Serializable {
        // 如果当前线程是此锁的持有者,则保持计数递减。 
        // 如果保持现在的计数为零,则解除锁定。 
        // 如果当前线程不是此锁的持有者则IllegalMonitorStateException异常。
        public void unlock() {
            sync.release(1);
        }
    }

    同样这块代码是使用 AQS 的逻辑,tryRelease 部分由 WriteLock 自己实现。

    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        return free;
    }
    • 如果是当前线程重入,扣减重入次数。
    • 扣减后如果为 0,则设置锁持有线程为 null,更新 state 值。AQS 会唤醒后续节点获取锁。

    总结
    Q: 在 ReentrantReadWriteLock 中 state 代表什么?
    A: state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
    Q: 线程获取锁的流程是怎么样的?
    A: 可以参考上面的源码笔记,以及后面的流程图。
    Q: 读锁和写锁的可重入性是如何实现的?
    A: 在加锁的时候,判断是否为当前线程,如果是当前线程,则直接累加计数。值得注意的是:读锁重入计数使用的 ThreadLocal 在线程中缓存计数,而写锁则直接用的 state 进行累加(其实和 state 低 16 位进行累加一样)。
    Q: 当前线程获取锁失败,被阻塞的后续操作是什么?
    A: 获取失败,会放到 AQS 等待队列中,在队列中不断循环,监视前一个节点是否为 head ,是的话,会重新尝试获取锁。
    Q: 锁降级是怎么降级的?
    A:

    如图,在圈出部分 fullTryAcquireShared 代码中,可以看出来,在获取读锁的时候,如果当前线程持有写锁,是可以获取读锁的。这块就是指锁降级,比如线程 A 获取到了写锁,当线程 A 执行完毕时,它需要获取当前数据,假设不支持锁降级,就会导致 A 释放写锁,然后再次请求读锁。而在这中间是有可能被其他阻塞的线程获取到写锁的。从而导致线程 A 在一次执行过程中数据不一致。

    小结

    1. ReentrantReadWriteLock 读写锁,内部实现是 ReadLock 读锁 和 WriteLock 写锁。读锁,允许共享;写锁,是独占锁。
    2. 读写锁都支持重入,读锁的重入次数记录在线程维护的 ThreadLocal 中,写锁维护在 state 上(低 16 位)。
    3. 支持锁降级,从写锁降级为读锁,防止脏读。
    4. ReadLock 和 WriteLock 都是通过 AQS 来实现的。获取锁失败后会放到 AQS 等待队列中,后续不断尝试获取锁。区别在读锁只有存在写锁的时候才放到等待队列,而写锁是只要存在非当前线程锁(无论写锁还是读锁)都会放到等待队列。
    5. 通过源码分析,可以得出读写锁适合在读多写少的场景中使用。

    参考源码:
    http://www.bubuko.com/infodetail-1961925.html
    https://blog.csdn.net/fxkcsdn/article/details/82217760
    https://blog.csdn.net/qq_36535538/article/details/107632511
    https://www.jianshu.com/p/9f98299a17a5

    郭慕荣博客园
  • 相关阅读:
    替换OSD操作的优化与分析
    Centos7下Jewel版本radosgw服务启动
    如何统计Ceph的RBD真实使用容量
    Ceph中的Copyset概念和使用方法
    Proftp最简匿名访问配置
    Windows could not set the offline local information.Error code:0X80000001解决方法
    《一百岁感言》 杨绛
    取扑克牌的问题
    马云的懒人理论
    明代地图总目
  • 原文地址:https://www.cnblogs.com/jelly12345/p/15206791.html
Copyright © 2011-2022 走看看