zoukankan      html  css  js  c++  java
  • Java并发:ReadWriteLock 读写锁

    读写锁在同一时刻可以允许多个线程访问,但是在写线程访问,所有的读线程和其他写线程均被阻塞。

    读写锁不像 ReentrantLock 那些排它锁只允许在同一时刻只允许一个线程进行访问,读写锁可以允许多个线程同时访问,并发性能相比一般的排它锁有很大的提升。

    当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知后,所有等待的读操作才能继续执行,这样的目的是能正确读到的数据,而不会出现脏读。

    ReadWriteLock 接口:

     1 public interface ReadWriteLock {
     2     /**
     3      * Returns the lock used for reading.
     4      *
     5      * @return the lock used for reading
     6      */
     7     Lock readLock();
     8 
     9     /**
    10      * Returns the lock used for writing.
    11      *
    12      * @return the lock used for writing
    13      */
    14     Lock writeLock();
    15 }

    读写锁维护了一个读锁和一个写锁。

    ReentrantReadWriteLock 是 ReadWriteLock 接口的一个实现。Java 类图如下:

    静态抽象内部类 Sync 继承了 AQS,对 ReentrantReadWriteLock 提供了支持。

    ReentrantReadWriteLock 除开接口的方法外,还有展示内部工作状态的方法:

    方法名称 描述
    int getReadLockCount()
    返回当前读锁被获取的次数
    int getReadHoldCount()
    返回当前线程获取读锁的次数
    boolean isWriteLocked()
    判断写锁是否被获取
    int getReadHoldCount()
    返回当前写锁被获取的次数

    读写锁的实现:

    ① 读写状态的设计

    读写锁同样是依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。和 ReentrantLock 有点不同,读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态。

    一个整型变量维护多个状态,就需要按其二进制位“切割”使用,读写锁把这个32位的整型变量分成了两个部分:高16位表示读状态,低16位表示写状态。

    1         static final int SHARED_SHIFT   = 16;
    2         static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    3         static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    4         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    5 
    6         /** 返回当前状态的共享资源数,消除低16位  */
    7         static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    8         /** 返回当前线程的独占资源数,按位与& 0x0000FFFF 除去高16位  */
    9         static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    ② 写锁的获取与释放

    写锁是一个支持重新进入的排它锁。

     1     protected final boolean tryAcquire(int acquires) {
     2             /*
     3              * 可能的过程:
     4              * 1. 如果读取计数非零或写入计数非零并且所有者是另一个线程,则失败。
     5              * 2. 如果计数饱和,则失败。(计数不为0)
     6              * 3. 否则,如果是可重入获取或队列策略允许,
     7              *     则该线程有资格进行锁定。如果是这样,请更新状态*并设置所有者.
     8              */
     9             Thread current = Thread.currentThread();
    10             int c = getState();
    11             int w = exclusiveCount(c);
    12             if (c != 0) {
    13                 // 存在读锁或者当前线程不是已获取锁的线程(Note: if c != 0 and w == 0 then shared count != 0)
    14                 if (w == 0 || current != getExclusiveOwnerThread())
    15                     return false;
    16                 if (w + exclusiveCount(acquires) > MAX_COUNT)
    17                     throw new Error("Maximum lock count exceeded");
    18                 // 重入获取
    19                 setState(c + acquires);
    20                 return true;
    21             }
    22             if (writerShouldBlock() ||
    23                 !compareAndSetState(c, c + acquires))
    24                 return false;
    25             setExclusiveOwnerThread(current);
    26             return true;
    27         }

    如果读锁存在,则写锁不能被获取,读写锁要确保写锁的操作对读锁可见,只有等待其他线程释放了读锁,写锁才能被获取。当写锁获取到,其他读写线程的后续访问均被阻塞。

    写锁的释放与 ReentrantLock 基本类似,每次释放均减少同步状态值,写状态为0是表示锁已被释放,其他读写线程才能继续访问读写锁,同时前一次写线程的修改对后续的读写进程可见。

    ③ 读锁的获取与释放

        protected final int tryAcquireShared(int unused) {
                /*
                 * 可能的情况:
                 * 1. 如果另一个线程持有写锁,则失败。
                 * 2. 否则,此线程有资格获得锁定状态,因此询问是否由于队列策略而应阻塞。
                 *    如果不是,尝试按CAS方式更新计数。
                 *    请注意,该步骤不检查重入获取,这会推迟到完整版本的获取方法,
                 *    以避免必须在更典型的非重入情况下检查保留计数。
                 * 3. 如果第2步失败,或者由于线程显然不符合条件或者CAS失败或计数饱和,请使用完全死循环版本。
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)  // 有写锁,但不是当前自己的
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {  // 在合适的条件下尝试用CAS设置
                    if (r == 0) {  // 读锁空闲,获取锁成功
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {  // 读锁不是空闲的,且第一个读线程是当前线程,获取锁成功
                        firstReaderHoldCount++;
                    } else {  // 不是第一个线程,获取锁成功
                        HoldCounter rh = cachedHoldCounter;  // 代表最后一个读锁线程的计数器
                        if (rh == null || rh.tid != getThreadId(current))  // 若最后一个线程计数器是空的或者不是当前线程的,那就新建一个
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)  // 如果不是空的,且count是0,将上一个线程的HoldCounter覆盖本地的
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);  // 死循环获取读锁
            }

     完整版本的获取读锁(死循环),包含降级策略

     1     final int fullTryAcquireShared(Thread current) {
     2             /*
     3              *  该代码与tryAcquireShared中的代码部分冗余,但由于不使 
     4              *  tryAcquireShared与重试和延迟读取保持计数之间的交互复杂化,
     5              *  因此总体上更简单。
     6              */
     7             HoldCounter rh = null;
     8             for (;;) {
     9                 int c = getState();  
    10                 if (exclusiveCount(c) != 0) {  // 低16位不为0,有线程有写锁
    11                     if (getExclusiveOwnerThread() != current) // 写锁被其他线程持有,获取锁失败
    12                         return -1;
    13                     // 否则我们将持有排他锁;在这里阻塞
    14                     // 将导致死锁
    15                 } else if (readerShouldBlock()) {
    16                     // 确保我们不会再获取读锁,若第一个读取线程为当前进程
    17                     if (firstReader == current) {
    18                         // 断言 firstReaderHoldCount > 0;
    19                     } else { // 若不是当前线程
    20                         if (rh == null) {
    21                             rh = cachedHoldCounter;
    22                             if (rh == null || rh.tid != getThreadId(current)) {
    23                                 rh = readHolds.get(); // 从ThreadLocal中取出计数器
    24                                 if (rh.count == 0)
    25                                     readHolds.remove();
    26                             }
    27                         }
    28                         if (rh.count == 0)
    29                             return -1;
    30                     }
    31                 }
    32                 if (sharedCount(c) == MAX_COUNT)
    33                     throw new Error("Maximum lock count exceeded");
    34                 if (compareAndSetState(c, c + SHARED_UNIT)) {  // 尝试设置读锁,高16位加1
    35                     if (sharedCount(c) == 0) {  // 读锁空闲
    36                         firstReader = current;
    37                         firstReaderHoldCount = 1;// 计数为1
    38                     } else if (firstReader == current) { // 不为空闲的话看看第一个线程是否为当前进程,是则更新当前计数器
    39                         firstReaderHoldCount++;
    40                     } else {  // 不是当前线程
    41                         if (rh == null)
    42                             rh = cachedHoldCounter;
    43                         if (rh == null || rh.tid != getThreadId(current))  // 如果最后一个读计数器所属线程不是当前线程
    44                             rh = readHolds.get();                 // 自己新建一个
    45                         else if (rh.count == 0)
    46                             readHolds.set(rh);
    47                         rh.count++;
    48                         cachedHoldCounter = rh; // 更新缓存计数器
    49                     }
    50                     return 1;
    51                 }
    52             }
    53         }
    54                 

    ④ 降级锁

    降级锁从获取到写锁开始。降级锁是指把持有的写锁,再获取到读锁,锁后释放写锁的过程。锁降级中读锁的获取是必要的,主要是为了保证数据获取的可见性,如果当前线程不获取读锁而是直接释放写锁,此刻若是有一个线程获取到写锁并修改了数据,当前的线程就无法感知之后那个线程的数据更新。若当前线程获取了读锁再释放写锁,之后那个想要获取写锁的线程就会被阻塞,直到当前线程释放了读锁之后,下个想获取写锁的线程才能进行数据更新。

  • 相关阅读:
    网站服务器架构设计
    使用同步或异步的方式完成 I/O 访问和操作(Windows核心编程)
    堆栈上的舞蹈之释放重引用(UAF) 漏洞原理实验分析
    内核模式下的线程同步的分析(Windows核心编程)
    用户模式下的线程同步的分析(Windows核心编程)
    Linux下部署Django项目
    HDU 2075 A|B?
    HDU 2052 Picture
    HDU 2024 C语言合法标识符
    HDU 2026 首字母变大写
  • 原文地址:https://www.cnblogs.com/magic-sea/p/11588084.html
Copyright © 2011-2022 走看看