zoukankan      html  css  js  c++  java
  • 源码分析:升级版的读写锁 StampedLock

    简介

    StampedLock 是JDK1.8 开始提供的一种锁, 是对之前介绍的读写锁 ReentrantReadWriteLock 的功能增强。StampedLock 有三种模式:Writing(读)、Reading(写)、Optimistic Reading(乐观度),StampedLock 的功能不是基于AQS来实现的,而是完全自己内部实现的功能,不支持重入。在加锁的时候会返回一个戳,解锁的时候需要传入,匹配完成解锁操作。

    官方使用示例

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();
    
        void move(double deltaX, double deltaY) { // an exclusively locked method
            // 写锁-独占资源
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }
    
        double distanceFromOrigin() { // A read-only method
            // 只读的方法,比较乐观,认为读的过程中不会有写,所以这里是乐观度
            long stamp = sl.tryOptimisticRead();
            double currentX = x, currentY = y;
            if (!sl.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
                // 获取一个普通的读锁
                stamp = sl.readLock();
                try {
                    currentX = x;
                    currentY = y;
                } finally {
                    // 释放读锁
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }
    
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) {
                    // 普通读锁转换成写锁,返回0为转换失败
                    long ws = sl.tryConvertToWriteLock(stamp);
                    if (ws != 0L) {
                        stamp = ws;
                        x = newX;
                        y = newY;
                        break;
                    }  else {
                        sl.unlockRead(stamp);
                        stamp = sl.writeLock();
                    }
                }
            } finally {
                sl.unlock(stamp);
            }
        }
    }
    

    官方demo中用到的 api 主要有获取写锁(writeLock())、释放写锁(unlockWrite(stamp))、获取普通读锁(readLock())、释放普通读锁(unlockRead(stamp))、获取乐观读锁(tryOptimisticRead())、检测乐观读版本(validate(stamp))、普通读锁转换成写锁(tryConvertToWriteLock(stamp))。下面分析源码的时候也主要根据这几个方法来分析。

    源码分析

    主要内部类

    1. 等待节点:WNode
      用于维护 CLH 队列的节点,源码如下:

      static final class WNode {
          volatile WNode prev;  // 前驱节点
          volatile WNode next;  // 后继节点
          volatile WNode cowait;    // 链接的读者列表
          volatile Thread thread;   // 线程
          volatile int status;      // 状态 0, WAITING, or CANCELLED
          final int mode;           // 两种模式:RMODE or WMODE
          WNode(int m, WNode p) { mode = m; prev = p; }
      }
      
    2. ReadWriteLockView:实现了ReadWriteLock接口,提供了读写锁获取接口

      final class ReadWriteLockView implements ReadWriteLock {
          public Lock readLock() { return asReadLock(); }
          public Lock writeLock() { return asWriteLock(); }
      }
      
    3. ReadLockView 和 WriteLockView:都实现了Lock接口,并实现了所有的方法

    主要属性

    1. CLH 队列

      /** Head of CLH queue */
      private transient volatile WNode whead;
      /** Tail (last) of CLH queue */
      private transient volatile WNode wtail;
      
    2. 其他常量属性

      /** CPU的核心数量,用来控制自旋的次数 */
      private static final int NCPU = Runtime.getRuntime().availableProcessors();
      /** 获取锁入队前最大重试次数:CPU核心数大于1:64,否则0 */
      private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
      /** 等待队列的头结点,获取锁最大重试次数:CPU核心数大于1:1024,否则0 */
      private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
      /** 再次阻塞前最大重试次数:CPU核心数大于1:65536,否则0  */
      private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
      /** The period for yielding when waiting for overflow spinlock */
      private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
      
      /** 用于读取器计数的位数 */
      private static final int LG_READERS = 7;
      
      // 用来计算state值的常量
      private static final long RUNIT = 1L;  // 读单位
      // 写锁的标识位 十进制:128  二进制位标示:1000 0000 
      private static final long WBIT  = 1L << LG_READERS;  
      // 读状态标识 admol 十进制:127  二进制: 0111 1111
      private static final long RBITS = WBIT - 1L;   
      // 读锁的最大标识  十进制:126 二进制 :0111 1110 
      private static final long RFULL = RBITS - 1L;  
      // 用来读取读写状态  十进制:255 二进制:1111 1111
      private static final long ABITS = RBITS | WBIT; 
      // ~255 ==  11111111111111111111111111111111111111111111111111111111 1000 0000
      // -128
      private static final long SBITS = ~RBITS; 
      
      // 同步状态state的初始值 256  二进制:0001 0000 0000
      private static final long ORIGIN = WBIT << 1;
      
      // 中断
      private static final long INTERRUPTED = 1L;
      
      // 节点的状态
      private static final int WAITING   = -1;
      private static final int CANCELLED =  1;
      
      // 节点的模式 
      private static final int RMODE = 0;
      private static final int WMODE = 1;
      
      /** 同步状态 初始值 256 0001 0000 0000*/
      private transient volatile long state;
      /** 读计数饱和时的额外读取器计数 */
      private transient int readerOverflow;
      

      StampedLock 虽然没有继承AQS,但是属性上很相似,都有一个CLH队列,和一个同步状态值state, StampedLock用8位来表示读写锁状态,前7位是用来标识读锁状态的,第8位标识写锁占用,如果读锁数量超过了126(0111 1110 ),超出的用readerOverflow来计数。

    构造方法

    public StampedLock() {
       // 初始值 256,  二进制:0001 0000 0000
        state = ORIGIN;
    }
    

    获取写锁:writeLock()

    源码展示:

    public long writeLock() {
        long s, next;  // bypass acquireWrite in fully unlocked case only
        return ((((s = state) & ABITS) == 0L &&  U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : acquireWrite(false, 0L));
    }
    

    代码分析:

    1. 首先执行的是 ((s = state) & ABITS)== 0L,用来表示读锁和写锁是否可以被获取
      解析:第一次时,state 初始值是256,ABITS是255,计算过程:0001 0000 0000 && 0000 1111 1111 ,结算结果为0。
    2. 执行CAS操作:U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
      解析:STATE 是state字段的内存地址相对于此对象的内存地址的偏移量,s 是期望值, next = s + WBIT 是更新值;s+WBIT 也就是 256 + 128,next计算结果为384,用二进制位表示就是0001 1000 0000,也就是将第8位设置为1,就是获得写锁。如果CAS 更新成功,返回next值,成功获得写锁。
    3. 如果CAS执行失败,则执行acquireWrite(false, 0L) ,进入等待队列获取锁

    acquireWrite 代码分析:

    // interruptible 是否要检查中断
    // deadline:0 一直等待获取锁
    private long acquireWrite(boolean interruptible, long deadline) {
        // node:即将入队排队的节点
        // p:当前排队节点入队之前的尾节点
        WNode node = null, p;
        // 第一次自旋:排队节点入队列自旋
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            // 这个if 和外面一样的,从代码运行到这期间所有没有被释放
            if ((m = (s = state) & ABITS) == 0L) {
                // CAS 再次尝试获取下写锁
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    // 成功获取写锁
                    return ns;
            }  else if (spins < 0)  // 走到这,说明上面还是没获取到写锁,写锁被占用了,m的值为128
                // 1. 确定自旋的次数 spins: 64 or 0 
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                // 2.自旋的次数大于0,随机减一次自旋次数,直到减到spins为0(by.精灵王 这里实际是空转,没什么特点的逻辑处理)
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            } else if ((p = wtail) == null) { // initialize queue
                // 3.自旋spins减到0后会立马执行到这里
                // p被赋值为尾节点  
                // 初始化队列, WMODE:写,null:前驱节点
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    // 初始化队列时,尾节点等于头节点
                    wtail = hd;
            } else if (node == null)
                // 4.初始化队列后,下一次自旋,构建当前排队节点,并指定了其尾节点
                node = new WNode(WMODE, p);
            else if (node.prev != p) // 如果当前节点的前驱不是尾节点
                // 5.前驱节点设置为之前队列的尾节点
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                // 6. CAS 更新尾节点为当前排队的节点 
                p.next = node;
                // 退出自旋
                break;
            }
        }
        // 第二次自旋
        for (int spins = -1;;) {
            WNode h, np, pp; int ps; 
            if ((h = whead) == p) { // 如果头节点和之前的尾节点p是同一个, 说明马上应该轮到node节点获得锁
                if (spins < 0)
                    // ① 设置自旋次数  1024 or 0
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    // 第一次自旋1024次没有获取到锁,这次自旋翻倍
                    // 自旋次数*2  2048   继续进入到下面的自旋
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    // ② 第三次自旋,不断尝试获得锁(自旋1024或者2048次),直到成功获得锁 或者 break
                    long s, ns;
                    // ((s = state) & ABITS) == 0L  表示锁没有被占用
                    if (((s = state) & ABITS) == 0L) {
                        // CAS 修改state值
                        if (U.compareAndSwapLong(this, STATE, s,  ns = s + WBIT)) {
                            // CAS 修改成功获得锁,设置新的头结点 
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        // 随机立减自旋次数  自旋次数为0时跳出自旋循环
                        break;
                }
            } else if (h != null) { // help release stale waiters
                // 头节点不为空
                // 进入情景:写锁被获取,队列中很多等待获取读锁的线程,写锁释放,读锁被唤醒后可能进入到这里
                WNode c; Thread w;
                while ((c = h.cowait) != null) { // 自旋
                    // 头节点的 cowait不为空
                    // h.cowait 修改成节点的下一个cowait
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                        // 唤醒 cowait 里面的线程
                        U.unpark(w);
                }
            }
            if (whead == h) { // 如果头结点没有变化
                // P 是之前的尾节点
                if ((np = node.prev) != p) { 
                    // != 之前的尾节点,也就是说当前节点的前驱节点不是尾节点时
                    if (np != null)
                        // 保存尾节点和当前节点的连接关系
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    // ③ 上面第三次自旋break后会进入到这里,修改尾节点状态
                    // 更新尾节点的状态为WAITING:-1, 然后继续回到第二次自旋的地方,重新开始自旋
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    // p节点状态是取消,则删除p节点
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                } else {
                    // 超时时间
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 设置了超时时间 已经超时,取消当前node节点
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    // 为给定地址设置值,忽略修饰限定符的访问限制,与此类似操作还有: putInt,putDouble,putLong,putChar等
                    U.putObject(wt, PARKBLOCKER, this);
                    // node节点指向当前线程
                    node.thread = wt;
                    // p.status < 0 的只有-1 也就是WAITING 
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                        // 阻塞当前线程
                        U.park(false, time);  // emulate LockSupport.park
                    // 线程被唤醒后,清除节点的线程
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted()) // 要检查中断 && 线程有被中断
                        // 取消当前node节点
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }
    

    获取写锁过程总结:

    1. 首先检查锁有没有被占用
      1. 没有被占用,尝试 CAS 修改state值,CAS 修改成功则获得锁,返回新的 state 值。
      2. CAS 修改失败的话进入下面自旋的逻辑
    2. 第一层自旋(节点入队):
      1. 先检查下锁有没有被占用((m = (s = state) & ABITS) == 0L), CAS 尝试一下获取锁,获取失败再继续自旋
      2. 入队之前会自旋64次(CPU核心数大于1),期间不做任何处理
      3. 初始化排队队列的队头队尾节点,当前节点加入到队尾,CAS 更新尾节点,更新成功则退出第一次自旋
    3. 开始第二层自旋(尝试获取锁,阻塞线程),第二层自旋和第三层自旋嵌套执行的:
      1. 如果头节点和之前的尾节点p还是是同一个(没有其他获取锁的节点排队已经入队), 说明马上应该轮到node节点获得锁(排队的只有node节点)。
        1. 初始化第三层自旋次数(第一次1024,第二次2048),开启第三层自旋
          1. 位运算检查锁是否有被释放((s = state) & ABITS) == 0L),CAS 修改 state 值,修改成功,退出,返回新的state值
        • 这里其实就是自旋和park线程之间性能的一个权衡,马上就要获得锁了,是自旋还是阻塞线程继续等,这里选择了先自旋1024次,如果没有获得锁,继续自旋2048次,如果还是没获得锁,则退出第三层自旋,回到第二层自旋,准备阻塞当前线程。
      2. 如果排队的头结点不为空,检查头结点的cowait 链表,如果不为空,自旋 CAS 修改头节点的cowait, 尝试唤醒整个链的节点线程
      3. 第三层自旋完成后还是没有获取到锁,阻塞当前线程,等待被唤醒,被唤醒后继续第二层自旋获取锁,重复这个过程,直到获取锁成功推出。

    释放写锁:unlockWrite(stamp)

    public void unlockWrite(long stamp) {
        WNode h;
        // state != stamp  检查解锁与加锁的版本是否匹配
        // (stamp & WBIT) == 0L 为true的话说明锁没有被占用
        if (state != stamp || (stamp & WBIT) == 0L)
            // 抛出异常
            throw new IllegalMonitorStateException();
        // 释放写锁,会增加state的版本
        // stamp += WBIT 等于二进制(第一次加写锁和解锁) 0001 1000 0000 + 0000 1000 0000 == 0010 0000 0000
        // 解锁会把stamp 的二进制第8位设置为0
        // 相当于重新赋值state值
        state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
        if ((h = whead) != null && h.status != 0) // 头结点不为 && 状态不为初始状态0(一般是WAITING -1),说明队列中有排队获取锁的线程
             // 唤醒头节点的后继节点
             release(h);
    }
    private void release(WNode h) {
        if (h != null) {
            // q节点:头节点的有效后继节点
            // w: 需要唤醒的线程
            WNode q; Thread w;
            // 将头节点的状态设置成0
            U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
            if ((q = h.next) == null || q.status == CANCELLED) { // 如果头节点的后继为空 或者 是取消状态
                // 就从排队的队尾找一个有效的节点
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            // 找到了有效的节点,唤醒其线程
            if (q != null && (w = q.thread) != null)
                U.unpark(w);
        }
    }
    

    释放写锁过程总结:

    1. 检查锁印章戳是否匹配,锁是否有被占用,检查不通过抛出异常
    2. 通过位运算stamp += WBIT计算新的state值,state 二进制位的第8位会被设置成0就是写锁解锁
    3. 检测队列中是否有排队获取锁的线程
      1. 唤醒下一个等待获取锁的线程(unpark(thread)

    获取普通读锁:readLock()

    public long readLock() {
        long s = state, next;  // bypass acquireRead on common uncontended case
        // 在没有线程获得锁的情况下,s的初始值是256 
        // whead == wtail 为true:表示队列为空
        // (s & ABITS) < RFULL: 已获取读锁的数小于最大值126
        return ((whead == wtail && (s & ABITS) < RFULL &&  U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) 
    					? next : acquireRead(false, 0L));
    }
    

    代码解析:

    1. 队列为空 && 已获取读锁次数小于126 && CAS 修改 state 值
      1. 条件完全成立,成功获得锁,返回新的state值
    2. 没有成功,进入到acquireRead(false, 0L)方法排队获取锁

    acquireRead源码展示(代码超100行,需耐心观看):

    private long acquireRead(boolean interruptible, long deadline) {
            // p节点为尾节点  node为入队节点
            WNode node = null, p;
            // 第一层大循环 第一次自旋,是不是和获取写锁的很像?
            for (int spins = -1;;) {
                WNode h;
                if ((h = whead) == (p = wtail)) { // 首尾节点相等,说明队列为空,有线程在排队不会进入if
                    // 前面没获取到锁,队列又为空,是不是应该马上就是当前线程获取锁了?
                    // 第二次自旋 自旋64次 目的是为了看马上能不能获取锁(排队队列为空,没线程排队时,会在这里自旋获取锁)
                    for (long m, s, ns;;) {
                        // 这里是个三目运算,代码太长,拆开来看
                        // (m = (s = state) & ABITS) < RFULL;和进入readLock()方法时的条件一样,判断读锁的数是否达到最大值,只有写锁被获取,这里就是false
                        // 我们假设前面写锁被获取了,现在获取读锁,m 就是128,大于RFULL 126
                        // 没有达到最大值, CAS 修改状态值 U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT)
                        // 超过最大值了,记得前面那个readerOverflow属性不?在tryIncReaderOverflow这累加   
                        if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                        else if (m >= WBIT) {// if条件成立,说明说明被占用
                            // 
                            if (spins > 0) {
                                if (LockSupport.nextSecondarySeed() >= 0)
                                    --spins; // 随机减自旋次数
                            } else {
                                if (spins == 0) { // 自旋次数减到0了,还没获取到读锁
                                    WNode nh = whead, np = wtail;
                                    if ((nh == h && np == p) || (h = nh) != (p = np))
                                        break; // 退出自旋
                                } 
                                spins = SPINS; // 初始自旋次数 64次
                            }
                        }
                    }
                   // 上面到这里都是在处理队列为空,马上要获取到锁的情况
                } 
    						if (p == null) { // 尾节点为空,初始化排队队列,有线程在排队时不会进入到这里
                    WNode hd = new WNode(WMODE, null);
                    if (U.compareAndSwapObject(this, WHEAD, null, hd)) // CAS 设置头节点
                        // 运行到这里后,会回到第一次的自旋,再次进入到第二次自旋,这次 spins 为0,只会自旋一次
                        wtail = hd;
                } else if (node == null)
                    // 初始化当前入队排队节点,有线程在排队时,直接进入到这里,然后继续第一次自旋
                    node = new WNode(RMODE, p); 
                else if (h == p || p.mode != RMODE) { // 队列为空 或者 尾节点不是读模式
                    // 排队节点入队
                    if (node.prev != p)
                        node.prev = p; // 设置排队节点的前驱节点
                        // 继续第一次自旋     
                    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {  // CAS 修改尾节点
                        p.next = node; // 老的尾节点的后继节点为当前节点
                        // 进入到这里会退出第一层自旋, 直接进入到下面第二层的大的自旋
                        break;
                    }                
                } else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node))
                    // 上面那个if分支进不去,只有条件:队列不为空 and  尾节点是读模式 为真
                    // 进入到了这里,说明CAS失败
                    // 这里的CAS 就是把当前节点加入到尾节点的cowait栈里面
                    // 从这里可以看出加入的顺序是个栈结构,先把旧的尾节点的cowait赋值给node节点的cowait,然后再把node节点赋值给尾节点
                    node.cowait = null;
                else { 
                    // 进入到这,说明上面的if分支都没有进去,尾节点不为空,当前节点不为空,队列不为空,尾节点是读模式,上面CAS修改成功
                    // 总结一下进入到这里的条件就是,有个线程获得了写锁还没释放,队列中有读线程在排队
                    // 第三次自旋,有线程在排队获取锁时,会进入到这里自旋
                    for (;;) {
                        WNode pp, c; Thread w;
                        if ((h = whead) != null && (c = h.cowait) != null &&
                            U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null) // help release
                            // 头节点不为空,且其cowait节点不为空,唤醒整个cowait栈的线程
                            U.unpark(w);
                        if (h == (pp = p.prev) || h == p || pp == null) {
                            // 头节点等于尾节点的前驱节点 或者头节点等于尾节点 或者 尾节点的前驱节点为空
                            // 说明还是马上轮到自己获得锁       
                            long m, s, ns;
                            do {
                                // 判断是否可以使用CAS获取读锁             
                                if ((m = (s = state) & ABITS) < RFULL ?  
                                    U.compareAndSwapLong(this, STATE, s,  ns = s + RUNIT) :
                                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                                    return ns;
                            } while (m < WBIT); // m < WBIT时表示写锁没有被占用,一直尝试获取锁
                        }
                        if (whead == h && p.prev == pp) { // 对头没有发生变化 ,队尾也没发生变化
                            long time;
                            if (pp == null || h == p || p.status > 0) { // 队尾的前驱节点为空或者 头节点等于尾节点 或者 老的尾节点被取消(>0的状态只有1,取消)
                                node = null; // 抛弃当前节点,退出当前循环,回到第一层的自旋,重新构建节点
                                break;
                            }
                            if (deadline == 0L)
                                time = 0L;
                            else if ((time = deadline - System.nanoTime()) <= 0L) // 超时
                                return cancelWaiter(node, p, false); // 取消节点
                            Thread wt = Thread.currentThread();
                            U.putObject(wt, PARKBLOCKER, this);
                            node.thread = wt;
                            if ((h != pp || (state & ABITS) == WBIT) &&
                                whead == h && p.prev == pp)
                                U.park(false, time); // 阻塞当前线程
                            // 线程被唤醒,开始继续自旋获取锁  
                            node.thread = null;
                            U.putObject(wt, PARKBLOCKER, null);
                            if (interruptible && Thread.interrupted())
                                return cancelWaiter(node, p, true);
                        }
                    }
                }
            }
            // 第二层大的循环 
            for (int spins = -1;;) {
                WNode h, np, pp; int ps;
                if ((h = whead) == p) {  // 如果队列为空,说明马上轮到当前线程获得锁了
                    // 这个大的if 里面做的就是获取锁
                    if (spins < 0)
                        // 初始化本次自旋获取锁的次数:1024次
                        spins = HEAD_SPINS;
                    else if (spins < MAX_HEAD_SPINS)
                        // 上面1024次自旋没有获取到锁,就自旋翻倍:2048次,继续下面的自旋
                        spins <<= 1;
                    // 开始自旋获取锁
                    for (int k = spins;;) { // spin at head
                        long m, s, ns;
                        // 这个if条件 检查了是否可以获取锁,如果可以就CAS获取锁
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                            (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                            // 进入到这个if里面,说明就获得了锁
                            WNode c; Thread w;
                            // 这里的node节点是还没有绑定thread的
                            whead = node;
                            node.prev = null;
                            // 要唤醒当前node节点中的所有cowait节点线程
                            // 当前节点是在上面的第一层大的自旋入队的,其他获取读锁的节点都是挂在这个节点的cowait下的
                            while ((c = node.cowait) != null) {
                                if (U.compareAndSwapObject(node, WCOWAIT,
                                                           c, c.cowait) &&
                                    (w = c.thread) != null)
                                    U.unpark(w); // 唤醒线程
                            }
                            return ns;
                        }  else if (m >= WBIT &&  LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                            // 上面没有获取到锁,自旋减次数,直到为0,退出自旋
                            break;
                    }
                } else if (h != null) { // 队列不为空,头节点不为空
                    WNode c; Thread w;
                    while ((c = h.cowait) != null) {
                        if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                }
                // 运行到这,说明还没获取到锁
                if (whead == h) { // 头节点没变过
                    if ((np = node.prev) != p) { // 检查节点的链接关系
                        if (np != null)
                            (p = np).next = node;   // stale
                    }
                    else if ((ps = p.status) == 0)
                        // 检查尾节点的状态,为0则更新成-1,回到第二层大的循环开始处
                        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                    else if (ps == CANCELLED) { // p节点被取消,删除这个节点
                        if ((pp = p.prev) != null) {
                            node.prev = pp;
                            pp.next = node;
                        }
                    } else { // 上面2048次自旋后还是没获取到锁,进入到最终阻塞线程的环节
                        long time;
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, node, false); // 超时了就取消当前节点
                        Thread wt = Thread.currentThread(); // 当前线程
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p)
                            U.park(false, time); // 阻塞线程
                        // 线程被唤醒了,继续执行 第二层大的自旋获取锁
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted()) // 被唤醒了,发现要求中断线程 并且线程被中断了
                            return cancelWaiter(node, node, true); // 取消当前节点
                    }
                }
            }
        }
    

    获取普通读锁总结:

    • 位运算检查写锁是否被占用,读锁是否超限制
      1. 满足条件,直接CAS 修改state值,并返回新的state值
    • 开始第一层大的自旋,第一层大的自旋里面包含了两种情况不同的自旋:
      1. 第一种自旋情况:排队的队列为空,没有其他线程在排队等待锁时
        • 这种情况说明,锁虽然已经被占用,但是马上就应该是我得到锁了,所以我先在这儿自旋(64次)等等你释放锁,免得阻塞我自己,之后唤醒还需要成本
        • 自旋没有获取到锁,会退出第一层大的自旋,进入到第二层大的自旋
      2. 第二种自旋情况:写锁被占用,排队的队列不为空,队尾是读模式时
        • 这种情况,会不断尝试获取锁,阻塞线程,等待被唤醒,一直在这个自旋里面,直到获得锁,或者超时中断被取消,不会进入到第二层大的自旋
    • 第二层大的自旋
      • 这一层的自旋是对第一层里面第一种自旋情况(马上轮到我获得锁,但是前面持有锁的线程就是不释放)的补充,因为没有线程在排队,只要前面的线程释放了锁,马上就可以获得锁了,所以这一层还是在自旋获得锁,只不过自旋次数有增加
        • 首先会尝试自旋1024次获得锁,如果前面还没释放锁,再自旋2048次
      • 如果2048次之后还是没有等到前面的锁释放,就阻塞当前线程,等待被唤醒,直到获得锁,或者超时中断被取消

    cowait栈分析

    下面代码是main线程先获取写锁不释放,之后T0,T1,T2,T3线程先后去获取读锁,最后断点观察整个排队队列的情况

    StampedLock sl = new StampedLock();
    long stamp = sl.writeLock();
    // 先让T0线程去排队到尾节点
    TimeUnit.SECONDS.sleep(1);
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            long stamp = sl.readLock();
            System.out.println(stamp + "   x");
        }
    },"T0").start();
    
    // 之后T1线程来获取读
    TimeUnit.SECONDS.sleep(3);
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            long stamp = sl.readLock();
            System.out.println(stamp + "   x");
        }
    },"T1").start();
    TimeUnit.SECONDS.sleep(3);
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            long stamp = sl.readLock();
            System.out.println(stamp + "   x");
        }
    },"T2").start();
    TimeUnit.SECONDS.sleep(3); // 在这里先断点,进入到这里后,再到源码位置去断点,就可以看到如下图的情况了
    new Thread(new Runnable(){
        @SneakyThrows
        @Override
        public void run(){
            long stamp = sl.readLock();
            System.out.println(stamp + "   x");
        }
    },"T3").start();
    

    运行代码后,断点截图:
    Untitled

    他们的关系可以用如下表示,横向是链表,纵向是cowait栈。
    微信图片_20201119181133

    释放读锁:unlockWrite(stamp)

    public void unlockRead(long stamp) {
        long s, m; WNode h;
        for (;;) { // 自旋
            if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                // 检查版本
                throw new IllegalMonitorStateException();
            if (m < RFULL) {  // 锁标识小于读锁的最大标识
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { // CAS 更新state值
                    if (m == RUNIT && (h = whead) != null && h.status != 0) // 头结点不为空
                        release(h); // 唤醒下一个节点
                    break;
                }
            } else if (tryDecReaderOverflow(s) != 0L) 
                // 读锁个数饱和溢出,尝试减少readerOverflow
                break;
        }
    }
    private void release(WNode h) {
        if (h != null) {
            WNode q; Thread w;
            U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
            if ((q = h.next) == null || q.status == CANCELLED) {
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            if (q != null && (w = q.thread) != null)
                U.unpark(w);
        }
    }
    

    释放读锁的逻辑也比较简单,和释放写锁的逻辑很相识,唤醒下一个节点的release方法也完全一致

    获取乐观读锁:tryOptimisticRead()

    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }
    

    乐观读锁的逻辑也比较简单,就一个三目运算,((s = state) & WBIT) == 0L 就是看写锁是否有被占用,写锁被占用返回0,否则返回写锁没被占用的包含高位版本有效戳(也就是写锁的版本)。

    检测乐观读版本:validate(stamp)

    public boolean validate(long stamp) {
        // 插入内存屏障,禁止load操作重排序。
        // 由于StampedLock提供的乐观读锁不阻塞写线程获取读锁,当线程共享变量从主内存load到线程工作内存时,会存在数据不一致问题
        // 解决锁状态校验运算发生重排序导致锁状态校验不准确的问题
    	  U.loadFence(); 
    	  return (stamp & SBITS) == (state & SBITS);
    }
    

    返回true:表示期间没有写锁发生,读锁为所谓

    返回false:表示期间有写锁发生

    那这里是怎么计算的呢?

    SBITS为-128,用二进制表示是:1111 1111 1111 1000 0000

    两种情况:

    1. 假如先获取乐观锁,再获取读锁;
      乐观锁返回的stamp为256,二进制位是 0001 0000 0000;
      获取读锁之后state值是257,二进制位是 0001 0000 0001;
      它们分别于与-128 进行与运算后都是0001 0000 0000,也就是十进制256,返回true;
    2. 假如先获取乐观锁,再获取写锁;
      乐观锁返回的stamp为256,二进制位是 0001 0000 0000;
      获取写锁之后state值是384,二进制位是 0001 1000 0000;
      它们分别与-128 进行与运算后,相当与 256 == 384,结果肯定返回false;

    普通读锁转换成写锁:tryConvertToWriteLock(stamp)

    public long tryConvertToWriteLock(long stamp) {
        // m标识最新的锁标识
        // a标识被转换的锁的锁标识
        long a = stamp & ABITS, m, s, next;
        while (((s = state) & SBITS) == (stamp & SBITS)) { // 检查锁持有状态
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                    return next;
            } else if (m == WBIT) { // 写锁已经被占用
                if (a != m)
                    break;
                return stamp;   // 说明被转换前就是写锁
            } else if (m == RUNIT && a != 0L) { // 被转换前的是普通读锁,写锁没被占用
                if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT))
                    // s:是之前的锁状态
                    // s - RUNIT:就是释放读锁
                    //  + WBIT :就是加写锁(进入之前写锁没被占用)
                    return next; // 返回最新的锁状态
            } else
                break;  // 其他情况,全部返回0,转换失败
        }
        return 0L; // 返回0,标识转换写锁失败
    }
    

    普通读锁转换成写锁过程总结:

    1. 如果转换前是写锁,直接返回写锁
    2. 如果转换前是读锁,转换期间,写锁被占用,返回0,转换失败
    3. 如果转换前是读锁,写锁没有被占用,释放读锁,加写锁,返回写锁,转换成功
    4. 其他情况,全部返回0,转换失败

    StampedLock 总结

    1. StampedLock 是一种支持乐观读锁的高级版读写锁
    2. StampedLock 没有使用AQS 同步框架,而是完全自己实现的同步状态state 和 CLH队列维护算法
    3. 同步状态state的低7位标识读锁的数量,第8位标识写锁是否被占用,高24位记录写锁的版本,每次释放写锁会版本位置会加1
    4. 写锁每次获取state会加128,释放也会加128,读锁是加减1
    5. StampedLock 的连续多个读锁线程,只有第一个是在队列上,后面的读线程都存在第一个线程的cowait栈结构上
    6. StampedLock 唤醒一个读锁线程后,读锁线程会唤醒所有在它cowait栈上的等待读锁线程
    7. StampedLock 用到了大量的自旋操作,适合持有锁时间比较短的任务,持有锁时间长的话等待的线程自旋后还是会阻塞自己。
    8. StampedLock 同一个线程先获取读锁,再获取写锁也会死锁
    9. StampedLock 写锁不支持重入,读锁支持重入
    10. StampedLock 不支持条件锁
    11. StampedLock 不支持公平锁,上来有条件就 CAS 尝试获得锁

    StampedLock 与 ReentrantReadWriteLock的区别总结

    使用功能上的区别:

    1. StampedLock 支持乐观读锁,RRWL 没有
    2. StampedLock 支持锁转换,tryConvertToXXXXX(stamp)
    3. StampedLock 写锁不支持重入,RRWL 支持重入
    4. StampedLock 不支持条件锁,RRWL 支持条件锁
    5. StampedLock 不支持公平锁,RRWL 支持公平锁

    底层实现的区别:

    1. StampedLock 没有使用同步框架AQS,RRWL 是基于AQS 来实现排队、阻塞、唤醒等功能的
    2. StampedLock 获取锁时,会直接使用CAS尝试获得锁(不公平,不看排队),会根据CPU核心数来决定自旋次数等待获取锁
    3. StampedLock 的 CLH 队列中连续的读线程只有首个节点存储在队列中,后面的节点都存储的首个节点的cowait栈中,即 1→5→4→3→2→1 这种顺序。
    4. StampedLock 中同步状态 state 被分成了三部分,第8位记录的是写锁的状态,低7位记录读锁的次数,其他位记录的是写锁的版本
    5. RRWL 中同步状态 state 被分成两部分,高16位记录读锁次数,低16位记录写锁次数
    6. StampedLock 唤醒一个读锁线程后,读线程会唤醒所有在它cowait栈上的等待读锁线程
  • 相关阅读:
    【数学】三分法
    【数学】【背包】【NOIP2018】P5020 货币系统
    【数学】【CF27E】 Number With The Given Amount Of Divisors
    【单调队列】【P3957】 跳房子
    【极值问题】【CF33C】 Wonderful Randomized Sum
    【DP】【CF31E】 TV Game
    【神仙题】【CF28D】 Don't fear, DravDe is kind
    【线段树】【CF19D】 Points
    【字符串】KMP字符串匹配
    【二维树状数组】【CF10D】 LCIS
  • 原文地址:https://www.cnblogs.com/admol/p/14007975.html
Copyright © 2011-2022 走看看