zoukankan      html  css  js  c++  java
  • Java并发编程-ReentrantReadWriteLock

    基于AQS的前世今生,来学习并发工具类ReentrantReadWriteLock。本文将从ReentrantReadWriteLock的产生背景、源码原理解析和应用来学习这个并发工具类。

    1、 产生背景

      前面我们学习的重入锁ReentrantLock本质上还是互斥锁,每次最多只能有一个线程持有ReentrantLock。对于维护数据完整性来说,互斥通常是一种过于强硬的规则,因此也就不必要的限制了并发性。互斥是一种保守的加锁策略,虽然可以避免“写/写”冲突和“写/读”冲突,但也同样避免了“读/读”冲突。和互联网的“二八法则”一样,大部分数据都是读数据,可以存放在缓存中,数据结构的操作其实很多也是读操作,可以考虑适当的放宽加锁需求,允许多个读操作线程同时访问数据结构以提升程序的性能。在这样的需求背景下,就产生了读写锁ReadWriteLock,一个资源可以同时被多个读操作访问,或者被一个写操作访问,但是不能读写操作同时访问。ReadWriteLock定义了接口规范,实际实现读写锁控制的类是ReentrantReadWriteLock,该类为读写锁提供了可重入的加锁语义。

    2、 源码原理解析

    2.1 读写锁原理

      既然是读写锁,那就是有两把锁,可以用AQS的同步状态表示其中的一把锁,再引入一个新的属性表示另外一把锁,但是这么做就变成了二元并发安全问题,使问题变得更加复杂。ReentrantReadWriteLock选择了用一个属性,即AQS的同步状态来表示读写锁,怎样用一个属性来表示读写锁呢?那就是位运算,对位运算不熟悉的可以先看下此文

      ReentantReadWriteLock采用“按位切割”的方式,就是将这个32位的int型state变量分为高16位和低16位来使用,高16位代表读状态,低16位代表写状态读锁是可以共享的,而写锁是互斥的,对于写锁而言,用低16位表示线程的重入次数,但是读锁因为可以同时有多个线程,所以重入次数需要通过其他的方式来记录,那就是ThreadLocal变量。从这也可以总结出来和ReentrantLock相比,写锁的重入次数会减少,最多不能超过65535次。读锁的线程数也有限制,最对不能超过65535个。

      假设状态变量是c,则读状态就是c>>>16(无符号右移16位),其实就是通过无符号右移运算抹掉低的16位,剩下的就是c的高16位。写状态是c&((1 << 16) - 1),其实就是c&00000000000000001111111111111111,与运算之后,高的16位被抹掉,剩下的就是c的低16位。如果读线程申请读锁,当前写锁重入次数不为 0 时,则等待,否则可以马上分配;如果是写线程申请写锁,当前状态为 0 则可以马上分配,否则等待。

    2.2 读锁的获取和释放

      读锁的获取方法如下:

     1 protected final int tryAcquireShared(int unused) {
     2             Thread current = Thread.currentThread();//当前线程
     3             int c = getState();
     4                 //持有写锁的线程可以获取读锁
     5             if (exclusiveCount(c) != 0 &&  //已经分配了写锁
     6                 getExclusiveOwnerThread() != current) //当前线程不是持有写锁的线程
     7                 return -1;
     8             int r = sharedCount(c); //读锁获取次数
     9             if (!readerShouldBlock() && //由子类根据公平策略实现决定是否可获取读锁
    10                 r < MAX_COUNT && //读锁获取次数小于最大值
    11                 compareAndSetState(c, c + SHARED_UNIT)) {//更新读锁状态
    12                 if (r == 0) {//读锁的第一个线程 此时可以不用记录到ThreadLocal
    13                     firstReader = current; 
    14                     firstReaderHoldCount = 1; //避免查找ThreadLocal 提升效率
    15                 } else if (firstReader == current) {//读锁的第一个线程重入
    16                     firstReaderHoldCount++;
    17                 } else {//非读锁的第一个线程
    18                     HoldCounter rh = cachedHoldCounter; //下面为重入次数更新
    19                     if (rh == null || rh.tid != getThreadId(current))
    20                         cachedHoldCounter = rh = readHolds.get();
    21                     else if (rh.count == 0)
    22                         readHolds.set(rh);
    23                     rh.count++;
    24                 }
    25                 return 1;
    26             }
    27             return fullTryAcquireShared(current); //获取读锁失败 循环重试
    28         }
    29 final int fullTryAcquireShared(Thread current) {
    30             HoldCounter rh = null;
    31             for (;;) {
    32                 int c = getState();
    33                 if (exclusiveCount(c) != 0) {//获取到写锁
    34                     if (getExclusiveOwnerThread() != current) 
    35                         return -1; //非写锁线程获取失败
    36                     // else we hold the exclusive lock; blocking here
    37                     // would cause deadlock.
    38                 } else if (readerShouldBlock()) {
    39                     // Make sure we're not acquiring read lock reentrantly
    40                     if (firstReader == current) {
    41                         // assert firstReaderHoldCount > 0;
    42                     } else {
    43                         if (rh == null) {
    44                             rh = cachedHoldCounter;
    45                             if (rh == null || rh.tid != getThreadId(current)) {
    46                                 rh = readHolds.get();
    47                                 if (rh.count == 0)
    48                                     readHolds.remove();
    49                             }
    50                         }
    51                         if (rh.count == 0)
    52                             return -1;
    53                     }
    54                 }
    55                 if (sharedCount(c) == MAX_COUNT) //读锁数量达到最大
    56                     throw new Error("Maximum lock count exceeded");
    57                 if (compareAndSetState(c, c + SHARED_UNIT)) {//读锁获取成功 处理方式和之前类似
    58                     if (sharedCount(c) == 0) {
    59                         firstReader = current;
    60                         firstReaderHoldCount = 1;
    61                     } else if (firstReader == current) {
    62                         firstReaderHoldCount++;
    63                     } else {
    64                         if (rh == null)
    65                             rh = cachedHoldCounter;
    66                         if (rh == null || rh.tid != getThreadId(current))
    67                             rh = readHolds.get();
    68                         else if (rh.count == 0)
    69                             readHolds.set(rh);
    70                         rh.count++;
    71                         cachedHoldCounter = rh; // cache for release
    72                     }
    73                     return 1;
    74                 }
    75             }
    76         }

    读锁的释放方法如下:

     1 protected final boolean tryReleaseShared(int unused) {
     2             Thread current = Thread.currentThread();
     3             if (firstReader == current) {//当前线程是读锁的第一个线程
     4                 // assert firstReaderHoldCount > 0;
     5                 if (firstReaderHoldCount == 1) //第一次占有读锁 直接清除该线程
     6                     firstReader = null;
     7                 else
     8                     firstReaderHoldCount--;//读锁的第一个线程重入次数减少
     9             } else {
    10                 HoldCounter rh = cachedHoldCounter;
    11                 if (rh == null || rh.tid != getThreadId(current))
    12                     rh = readHolds.get();
    13                 int count = rh.count;
    14                 if (count <= 1) {
    15                     readHolds.remove();//读锁释放
    16                     if (count <= 0)
    17                         throw unmatchedUnlockException();
    18                 }
    19                 --rh.count; //重入次数减少
    20             }
    21             for (;;) {
    22                 int c = getState();
    23                 int nextc = c - SHARED_UNIT;
    24                     //减少读锁的线程数量
    25                 if (compareAndSetState(c, nextc))
    26                     // Releasing the read lock has no effect on readers,
    27                     // but it may allow waiting writers to proceed if
    28                     // both read and write locks are now free.
    29                     return nextc == 0;
    30             }
    31         }

    2.3 写锁的获取和释放

      写锁的获取方法如下:

     1 protected final boolean tryAcquire(int acquires) {
     2             Thread current = Thread.currentThread();
     3             int c = getState();
     4             int w = exclusiveCount(c);//写锁状态
     5             if (c != 0) {//表示锁已经被分配出去了 if c != 0 and w == 0表示获取读锁
     6                 // (Note: if c != 0 and w == 0 then shared count != 0)
     7                     //其他线程获取到了写锁
     8                 if (w == 0 || current != getExclusiveOwnerThread())
     9                     return false;
    10                     //写锁重入次数超过最大值
    11                 if (w + exclusiveCount(acquires) > MAX_COUNT)
    12                     throw new Error("Maximum lock count exceeded");
    13                 // Reentrant acquire  更新写锁重入次数
    14                 setState(c + acquires);
    15                 return true;
    16             }
    17             if (writerShouldBlock() ||//子类实现写锁是否公平获取
    18                 !compareAndSetState(c, c + acquires))
    19                 return false;//cas获取写锁失败
    20             setExclusiveOwnerThread(current);//获取写锁成功 独占
    21             return true;
    22         }

    写锁的释放方法如下:

     1 protected final boolean tryRelease(int releases) {
     2             if (!isHeldExclusively())//当前线程不持有写锁
     3                 throw new IllegalMonitorStateException();
     4             int nextc = getState() - releases; //重入次数减少
     5             boolean free = exclusiveCount(nextc) == 0; //减少到0写锁释放
     6             if (free)
     7                 setExclusiveOwnerThread(null); //写锁释放
     8             setState(nextc);
     9             return free;
    10         }

    2.4 锁降级

      锁降级指的是写锁降级为读锁,首先持有当前写锁,然后获取到读锁,在tryAcquireShared方法中已经体现了该过程,随后再释放该写锁的过程。锁降级主要是为了保持数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此时有另外的线程获取到了写锁并修改了数据,那么当前线程是无法知晓数据已经更新了。如果当前线程遵循锁降级的过程,则其他线程会被阻塞,直到当前线程操作完成其他线程才可以获取写锁进行数据更新。RentrantReadWriteLock不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。

    3、 应用

      概况性的总结RentrantReadWriteLock的应用,就是ReentrantLock能使用的地方,RentrantReadWriteLock都能使用,而且能提供更好的吞吐率。

    参考资料:

    https://github.com/lingjiango/ConcurrentProgramPractice

  • 相关阅读:
    LabelImg 图像图像标注工具
    周杰伦的2000w个故事
    ROS 订阅图像节点(1)
    ROS 订阅图像节点
    ROS 双目标定
    书籍
    Z30云台PC控制问题
    大疆M600组装和试飞
    M100 X3云台安装
    M100 组装教程
  • 原文地址:https://www.cnblogs.com/lgjava/p/11268451.html
Copyright © 2011-2022 走看看