zoukankan      html  css  js  c++  java
  • 从零开始了解多线程 之 深入浅出AQS -- 上

    java锁&AQS深入浅出学习--上

    上一篇文章中我们一起学习了jvm缓存一致性、多线程间的原子性、有序性、指令重排的相关内容,
    这一篇文章便开始和大家一起学习学习AQS(AbstractQueuedSynchronizer)的内容
    主要是包含以下三个方面

    synchronized
    ReentrantLock
    AbstractQueuedSynchronizer
    

    1.了解并发同步器

    多线程编程中,有可能会出现多个线程同时访问同一个共享、可变资源的情况;这种资源可能是:对象、变量、文件等。
    共享:资源可以由多个线程同时访问
    可变:资源可以在其生命周期内被修改
    由此可以得出 由于线程执行的过程是不可控的,所以需要采用同步机制来协同对对象可变状态的访问
    

    那么我们怎么解决线程并发安全问题?

    实际上,所有的并发模式在解决线程安全问题时,采用的方案都是序列化访问临界资源。
    即在同一时刻,只能有一个线程访问临界资源,也称作同步互斥访问。
    Java 中,提供了两种方式来实现同步互斥访问:synchronized和Lock
    

    同步器的本质就是加锁

    加锁目的:序列化访问临界资源,即同一时刻只能有一个线程访问临界资源(同步互斥访问) 
    不过有一点需要区别的是:
        当多个线程执行一个方法时,该方法内部的局部变量 并不是临界资源,
        因为这些局部变量是在每个线程的私有栈中,因此不具有共享性,不会导致线程安全问题
    

    其中锁包括 显式锁 和 隐式锁

    显式: ReentrantLock

     ReentrantLock,实现juc里Lock,实现是基于AQS实现,需要手动加锁跟解锁ReentrantLock lock(),unlock();
    

    隐式: Synchronized

     Synchronized加锁机制,Jvm内置锁,不需要手动加锁与解锁,Jvm会自动加锁跟解锁
    

    synchronized原理详解

     synchronized内置锁是一种对象锁(锁的是对象而非引用),作用粒度是对象,可以用来实现对临界资源的同步互斥访问,是可重入的
         
     以下是他的三种加锁方式:
     
         加锁的方式: 同步实例方法,锁是当前实例对象(加入spring容器管理的,锁是当前实例对象的时候,不能是多例的)
         同步类方法,锁是当前类对象
         同步代码块,锁是括号里面的对象
     
     
     JVM内置锁通过synchronized使用,通过内部对象Monitor(监视器锁)实现,基于进入与退出Monitor对象实现方法与代码块同步,
     监视器锁的实现依赖底层操作系统的Mutex lock(互斥锁)实现,它是一个重量级锁性能较低
    
        /**
         *越过jvm直接操作内存的工具
         * @author njw
         */
        public class UnsafeInstance {
        
            public static Unsafe reflectGetUnsafe() {
                try {
                    Field field = Unsafe.class.getDeclaredField("theUnsafe");
                    field.setAccessible(true);
                    return (Unsafe) field.get(null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return null;
            }
        
            /**
             * 不使用lock,怎么实现跨方法进行加锁和释放?
             * 方法:可以通过Unsafe来实现
             *     synchronized底层实现字节码翻译之后 便是如此的
             */
        
            private Object object = new Object();
            public void  test(){
                reflectGetUnsafe().monitorEnter(object);
            }
            public void  test1(){
                reflectGetUnsafe().monitorExit(object);
            }
        }
    

    synchronized 底层原理

     synchronized是基于JVM内置锁实现,通过内部对象Monitor(监视器锁)实现,基于进入与退出Monitor对象实现方法与代码块同步,
     监视器锁的实现依赖底层操作系统的Mutex lock(互斥锁)实现,它是一个重量级锁性能较低。
     JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(LockCoarsening)、锁消除(Lock Elimination)、
     轻量级锁(LightweightLocking)、偏向锁(Biased Locking)、适应性自旋(Adaptive Spinning)等技术来减少锁操作的开销,内置锁的并发性能已经基本与Lock持平。
     
     synchronized关键字被编译成字节码后会被翻译成monitorenter和monitorexit 两条指令分别在同步块逻辑代码的起始位置与结束位置
    

    每个同步对象都有一个自己的Monitor(监视器锁),加锁过程如下图所示:
    

    那么有个问题来了,我们知道synchronized加锁加在对象上,对象是如何记录锁状态的呢?

    答案是锁状态是被记录在每个对象的对象头(Mark Word)中,下面我们一起认识一下对象的内存布局
    

    内存对象布局图

    1. 对象的内存布局

    HotSpot虚拟机中,对象在内存中存储的布局可以分为三块区域:

    对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。
      
    对象头:比如 hash码,对象所属的年代,对象锁,锁状态标志,偏向锁(线程)ID,偏向时间,数组长度(数组对象)等
    实例数据:即创建对象时,对象中成员变量,方法等
    对齐填充:对象的大小必须是8字节的整数倍
    

    对象头

    HotSpot虚拟机的对象头包括两部分信息,第一部分是“Mark Word”,用于存储对象自身的运行时数据, 
    如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等等,
    这部分数据的长度在32位和64位的虚拟机(暂 不考虑开启压缩指针的场景)中分别为32个和64个Bits,
    官方称它为“Mark Word”。对象需要存储的运行时数据很多,其实已经超出了32、64位Bitmap结构所能记录的限度,
    但是对象头信息是与对象自身定义的数据无关的额 外存储成本,考虑到虚拟机的空间效率,
    Mark Word被设计成一个非固定的数据结构以便在极小的空间内存储尽量多的信息,它会根据对象的状态复用自己的存储空间。
    
    例如在32位的HotSpot虚拟机中对象未被锁定的状态下,Mark Word的32个Bits空间中的25Bits用于存储对象哈希码(HashCode),
    4Bits用于存储对象分代年龄,2Bits用于存储锁标志位,1Bit固定为0,
    
    对象头信息是与对象自身定义的数据无关的额外存储成本,但是考虑到虚拟机的空间效率,Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据,
    它会根据对象的状态复用自己的存储空间,也就是说,Mark Word会随着程序的运行发生变化,变化状态如下
    

    32位jvm对象存储图

    但是如果对象是数组类型,则需要三个机器码,因为JVM虚拟机可以通过Java对象的元数据信息确定Java对象的大小,
    但是无法从数组的元数据来确认数组的大小,所以用一块来记录数组长度.
    

    在此提出一个问题:程序中,实例对象内存 存储在哪?
    很多人了解到的都是实例对象存储在 堆内存 中,确实,基本上实例对象内存都是存在堆内存中的

    如果实例对象存储在堆区时:实例对象内存存在堆区,实例的引用存在栈上,实例的元数据class存在方法区或者元空间
    

    但实际上Object实例对象是不一定是存在堆区的,如果实例对象发生了 线程逃逸行为 则其内存将可能存在 线程栈中
    下面就这个问题来分析一下

    逃逸分析

    使用逃逸分析的情况,编译器可以对代码做如下优化

       一、同步省略。
            如果一个对象被发现只能从一个线程被访问到,那么对于这个对象的操作可以不考虑同步。
            
       二、将堆分配转化为栈分配。
            如果一个对象在子程序中被分配,要使指向该对象的指针永远不会逃逸,对象可能是栈分配的候选,而不是堆分配。
            
       三、分离对象或标量替换。
            有的对象可能不需要作为一个连续的内存结构存在也可以被访问到,那么对象的部分(或全部)可以不存储在内存,
            而是存储在CPU寄存器中。
    

    在Java代码运行时,通过JVM参数可指定是否开启逃逸分析, ­

    XX:+DoEscapeAnalysis : 表示开启逃逸分析 
    ­XX:­DoEscapeAnalysis :表示关闭逃逸分析 
    从jdk 1.7开始已经默认开始逃逸分析,如需关闭,需要指定­XX:­DoEscapeAnalysis
    

    逃逸分析代码

    /**
     * 线程逃逸 分析
     * @author njw
     */
    public class StackAllocTest {
    
        /**
         * 进行两种测试
         * 1. 关闭逃逸分析,同时调大堆空间,避免堆内GC的发生,如果有GC信息将会被打印出来
         *    VM运行参数:-Xmx4G -Xms4G -XX:-DoEscapeAnalysis -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
         *
         *    以管理员方式运行 power shell
         *    jps 查看进程 :6080 StackAllocTest
         *    jmap -histo 6080
         *      结果
         *         1:           740       70928456  [I
         *         2:        500000       12000000  com.it.edu.sample.StackAllocTest$TestStudent
         *      50W个对象
         *
         *
         *
         * 2. 开启逃逸分析
         *    VM运行参数:-Xmx4G -Xms4G -XX:+DoEscapeAnalysis -XX:+PrintGCDetails -XX:+HeapDumpOnOutOfMemoryError
         *
         *    结果
         *        1:           740       79444704  [I
         *        2:        145142        3483408  com.it.edu.sample.StackAllocTest$TestStudent
         *    只有145142个
         *
         */
        public static void main(String[] args) {
            long start = System.currentTimeMillis();
            for (int i = 0; i < 500000; i++) {
                alloc();
            }
            long end = System.currentTimeMillis();
            //查看执行时间
            System.out.println("cost-time " + (end - start) + " ms");
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    
    
        /**
         *  在主线程中不停创建TestStudent 按照正常逻辑思考 循环50W次,创建后堆区 里面就会有50W的对象
         *  如果堆区里面远远小于50W个 可能对象就存在当前线程栈中
         *  考虑到是否发生GC,当前把GC回收日记打印出来,并同时调大堆空间,避免堆内GC的发生
         *
         *  存在栈中的原因:
         *      Jit对编译时会对代码进行 逃逸分析()
         *      并不是所有对象存放在堆区,有的一部分存在线程栈空间
         * @return
         */
        private static TestStudent alloc() {
            TestStudent student = new TestStudent();
            return student;
        }
        static class TestStudent {
            private String name;
            private int age;
        }
    }
    

    3.局面内置锁的升级

    JVM锁升级是过程图

    JDK1.6版本之后对synchronized的实现进行了各种优化,如自旋锁、偏向锁和轻量级锁,并默认开启偏向锁

    开启偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0
    关闭偏向锁:-XX:-UseBiasedLocking
    

    JVM锁升级不可逆

    锁的状态总共有四种,无锁状态、偏向锁、轻量级锁和重量级锁。
    随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁,但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级。
    下图为锁的升级全过程

    JVM锁升级-线程间交替执行轻量级锁过程

    jvm锁的升级详解

    JVM锁升级是过程图

    32位jvm对象存储图
     32位jvm对象存储图

    JVM锁的膨胀升级_无锁到重量级锁
    JVM锁的膨胀升级_无锁到重量级锁

    偏向锁

    偏向锁是Java 6之后加入的新锁,它是一种针对加锁操作的优化手段,经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,
    而且总是由同一线程多次获得,因此为了减少同一线程获取锁(会涉及到一些CAS操作,耗时)的代价而引入偏向锁。
    
    偏向锁的核心思想是,
        如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word 的结构也变为偏向锁结构,当这个线程再次请求锁时,无需
        再做任何同步操作,即获取锁的过程,这样就省去了大量有关锁申请的操作,从而也就提供程序的性能。
    所以,对于没有锁竞争的场合,偏向锁有很好的优化效果,毕竟极有可能连续多次是同一个线程申请相同的锁。
    但是对于锁竞争比较激烈的场合,偏向锁就失效了,因为这样场合极有可能每次申请锁的线程都是不相同的,因此这种场合下不应该使用偏向锁,
    否则会得不偿失,需要注意的是,偏向锁失败后,并不会立即膨胀为重量级锁,而是先升级为轻量级锁。
    

    轻量级锁

    倘若偏向锁失败,虚拟机并不会立即升级为重量级锁,
    适用于线程交替执行,同步代码逻辑少 所需执行执行时间比较少的
    它还会尝试使用一种称为轻量级锁的优化手段(1.6之后加入的),此时Mark Word 的结构也变为轻量级锁的结构。
    
    轻量级锁能够提升程序性能的依据是“对绝大部分的锁,在整个同步周期内都不存在竞争”,注意这是经验数据。
    需要了解的是,轻量级锁所适应的场景是 **线程交替执行同步块** 的场合,如果存在同一时间访问同一锁的场合,
    就会导致轻量级锁膨胀为重量级锁
    

    自旋锁

    轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。
    这是基于在大多数情况下,线程持有锁的时间都不会太长,
    如果直接挂起操作系统层面的线程可能会得不偿失,毕竟操作系统实现线程之间的切换时需要从用户态转换到核心态,
    这个状态之间的转换需要相对比较长的时间,时间成本相对较高,因此自旋锁会假设在不久将来,当前的线程可以获得锁,
    因此虚拟机会让当前想要获取锁的线程做几个空循环(这也是称为自旋的原因),一般不会太久,可能是50个循环或100循环,
    
    在经过若干次循环后,如果得到锁,就顺利进入临界区。如果还不能获得锁,那就会将线程在操作系统层面挂起,
    这就是自旋锁的优化方式,这种方式确实也是可以提升效率的。最后没办法也就只能升级为重量级锁了
    

    锁消除 和 锁的粗化

    消除锁是虚拟机另外一种锁的优化,这种优化更彻底,
    
    Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,
    去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间,
    
    例如说 
        StringBuffer的append是一个同步方法,但是在add方法中的StringBuffer属于一个局部变量,
        并且不会被其他线程所使用,因此StringBuffer不可能存在共享资源竞争的情景,JVM会自动将其锁消除
    

    代码分析,锁的粗化和消除

    /**
     *
     * JVM对锁的优化
     *  1.锁的粗化
     *  2.锁的消除
     *
     * @author njw
     */
    public class Test {
    
        StringBuffer stb = new StringBuffer();
    
        /**
         * 锁的粗化
         *
         * StringBuffer 调用 append的时候,锁加在当前对象上
         * 按照正常逻辑思考 下面调用了 四次 append,相当于加了四个同步块
         *      synchronized{
         *           stb.append("1");
         *      }
         *      synchronized{
         *          stb.append("2");
         *      }
         *      ...
         *
         *      如果是这样意味着这次操作要进行四次上下文切换,四次加锁,四次释放锁
         *
         *  但是jvm经过优化,会把 四个变成一个,加成了一个统一的全局锁 这就是锁的粗化
         *      synchronized{
         *          stb.append("1");
         *          stb.append("2");
         *      }
         *
         */
        public void test1(){
            //jvm的优化,锁的粗化
            stb.append("1");
            stb.append("2");
            stb.append("3");
            stb.append("4");
        }
    
        /**
         * 锁的消除
         * 
         *  synchronized (new Object()) {
         *      //伪代码:很多逻辑
         *  }
         * jvm是否会对上面代码进行加锁?    
         * 答案 这里jvm不会对这同步块进行加锁
         * 
         * 这里的代码中 jvm会进行逃逸分析
         *  因为:new Object()这个加锁对象中,这个new Object()并不会被其他线程访问到,加锁并没有意义,不会产生线程 逃逸
         *  所以这里不会加锁 这便是 JVM 锁的消除
         *  
         *  具体情况查看 逃逸分析 优化
         *
         */
        public void test2(){
            //jvm的优化,JVM不会对同步块进行加锁
            synchronized (new Object()) {
                //伪代码:很多逻辑
                //jvm是否会加锁?
                //jvm会进行逃逸分析
            }
        }
        public static void main(String[] args) {
            Test test = new Test();
        }
    }
    

    2.1 同步框架AbstractQueuedSynchronizer

    Java并发编程核心在于java.concurrent.util包
    而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如 等待队列、条件队列、独占获取(排他锁)、共享获取(共享) 等
    而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS
    AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器

    state 会记录加锁状态、次数等 ,使框架有了可重复入的特性
    独占获取 抽象除了排他锁
    共享获取 抽象除了共享锁
    等待队列,条件队列  使其具备了公平、非公平特性
    

    AQS具备特性

    阻塞等待队列
    共享/独占
    公平/非公平
    可重入
    允许中断
    

    例如Java.concurrent.util当中同步器的实现如Lock,Latch,Barrier等,都是基于AQS框架实现

    一般通过定义内部类Sync继承AQS
    将同步器所有调用都映射到Sync对应的方法
    
    1. AQS内部维护属性volatile int state (32位)

      state表示资源的可用状态

    2. State三种访问方式

      getState()、setState()、compareAndSetState()

    3. AQS定义两种资源共享方式

      Exclusive-独占,只有一个线程能执行,如ReentrantLock
      Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

    4. AQS定义两种队列

      同步等待队列
      条件等待队列

    不同的自定义同步器争用共享资源的方式也不同。
    自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),
    AQS已经在顶层实现好了。

    自定义同步器实现时主要实现以下几种方法:

    isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    
    tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    
    tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    
    tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    
    tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
    

    2.2 同步等待队列

    CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,
    是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制

    CLH队列图
    等待队列图解

    2.3 条件等待队列

    Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),
    只有当该条件具备时 ,这些等待线程才会被唤醒,从而重新争夺锁

    条件队列图
    条件队列图解

    2.4 公平锁

    公平锁-打水图

    2.5 非公平锁

    非公平锁-打水图

    2.6 重入锁

    重入锁-打水图

    2.7 非重入锁

    非重入锁-打水图

    2.8 读写锁

    写锁(独享锁、排他锁),是指该锁一次只能被一个线程所持有。如果线程T对数据A加上排它锁后,则其他线程不能再对A加任何类型的锁。获得写锁的线程即能读数据又能修改数据。
    读锁(共享锁)是指该锁可被多个线程所持有。如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获得读锁的线程只能读数据,不能修改数据。
    
    AQS中state字段(int类型,32位),此处state上分别描述读锁和写锁的数量于是将state变量“按位切割”切分成了两个部分
    高16位表示读锁状态(读锁个数)
    低16位表示写锁状态(写锁个数)
    

    32位读写锁图

    4. ReentrantLock分析

    1. ReentrantLock 内部类

    1.Node节点介绍

      static final class Node {
      
              /**
               * 标记节点未共享模式
               * */
              static final Node SHARED = new Node();
              /**
               *  标记节点为独占模式
               */
              static final Node EXCLUSIVE = null;
      
              /**
               * 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
               * 在队列节点构建的时候 假如一个节点加入等待队列 会在加入的时候检查其他队列中旳节点是否处于 这个状态,如果是的话就剔除,
               * 并且继续检查其他的?
               * */
              static final int CANCELLED =  1;
      
              /**
               *  后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,
               *  将会通知后继节点,使后继节点的线程得以运行。
               *
               *  此状态是可以被唤醒的 可以去获取锁
               */
              static final int SIGNAL    = -1;
      
              /**
               *  处于等待队列
               *  该状态说明 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
               *  该节点会从  等待队列  中转移到  同步队列  中,加入到同步状态的获取中
               */
              static final int CONDITION = -2;
      
              /**
               *  表示下一次共享式同步状态获取将会被 无条件地传播下去
               *  假如线程t1 执行完之后,广播发现t2,处于 PROPAGATE 状态,可以无条件去唤醒,并继续检查t3
               */
              static final int PROPAGATE = -3;
      
              /**
               * 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
               * 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
               * 即被一个线程修改后,状态会立马让其他线程可见。
               */
              volatile int waitStatus;
      
              /**
               * 前驱节点,当前节点加入到同步队列中被设置
               */
              volatile Node prev;
      
              /**
               * 后继节点
               */
              volatile Node next;
      
              /**
               * 节点同步状态的线程
               */
              volatile Thread thread;
      
              /**
               * TODO 这个节点用在条件队列中 信号灯
               *
               * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
               * 也就是说节点类型 (独占和共享)和 等待队列中 的后继节点共用同一个字段。
               */
              Node nextWaiter;
      
              /**
               * 判断是否共享
               * Returns true if node is waiting in shared mode.
               */
              final boolean isShared() {
                  return nextWaiter == SHARED;
              }
      
              /**
               * 返回前驱节点
               */
              final Node predecessor() throws NullPointerException {
                  Node p = prev;
                  if (p == null)
                      throw new NullPointerException();
                  else
                      return p;
              }
      
              Node() {    // Used to establish initial head or SHARED marker
              }
      
              Node(Thread thread, Node mode) {     // Used by addWaiter
                  this.nextWaiter = mode;
                  this.thread = thread;
              }
      
              Node(Thread thread, int waitStatus) { // Used by Condition
                  this.waitStatus = waitStatus;
                  this.thread = thread;
              }
          }
    

    2.FairSync 公平锁

      static final class FairSync extends Sync {
              private static final long serialVersionUID = -3000897897090466540L;
      
              @Override
              final void lock() {
                  acquire(1);
              }
      
              /**
               * 重写aqs中的方法逻辑
               * 尝试加锁,被AQS的acquire()方法调用
               */
              @Override
              protected final boolean tryAcquire(int acquires) {
                  final Thread current = Thread.currentThread();
                  int c = getState();
      
                  // 表示当前没有任何线程加锁,可以去加锁
                  if (c == 0) {
                      /**
                       * 与非公平锁中的区别,需要先判断队列当中是否有等待的节点
                       * 如果没有则可以尝试CAS获取锁 : 使用原子操作更新 状态
                       * compareAndSetState : 依赖于 unsafe 操作执行原子比较操作
                       */
      
                      //  hasQueuedPredecessors: 判断是否头结点不等于尾结点 同时 头结点的下一个为空,或者头结点的下一个不是当前线程
                      if (!hasQueuedPredecessors() &&
                              compareAndSetState(0, acquires)) {
                          //独占线程指向当前线程
                          setExclusiveOwnerThread(current);
                          return true;
                      }
                  }
                  // 状态已经被修改过了  判断当前线程是否是获取到的那个 如果是说明在重入
                  else if (current == getExclusiveOwnerThread()) {
                      // 重入锁 添加锁数量
                      int nextc = c + acquires;
                      if (nextc < 0)
                          throw new Error("Maximum lock count exceeded");
                      setState(nextc);
                      return true;
                  }
                  return false;
              }
          }
    

    ReentrantLock 二次加锁图

    ReentrantLock 多线程公平锁加锁图

    3.NonfairSync 非公平锁

     NonfairSync 定义
     
      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = 7316153563782823691L;
    
          /**
           * 加锁行为
           */
          @Override
          final void lock() {
              
              /**
               * 第一步:直接尝试加锁
               * 与公平锁实现的加锁行为一个最大的区别在于,此处不会去判断同步队列(CLH队列)中是否有排队等待加锁的节点,
               * 一上来就直接加锁(判断state是否为0,CAS修改state为1)
               * 并将独占锁持有者 exclusiveOwnerThread 属性指向当前线程
               * 如果当前有人占用锁,再尝试去加一次锁
               */
              if (compareAndSetState(0, 1)) {
                  // 尝试修改拥有线程为当前线程
                  setExclusiveOwnerThread(Thread.currentThread());
              } else {
                  //AQS定义的方法,加锁
                  acquire(1);
              }
          }
    
          /**
           * 父类AbstractQueuedSynchronizer.acquire()中调用本方法
           */
          @Override
          protected final boolean tryAcquire(int acquires) {
              return nonfairTryAcquire(acquires);
          }
      }
      
     NonfairSync 获取锁
     
       /**
       * 尝试获取非公平锁
       */
      final boolean nonfairTryAcquire(int acquires) {
          //acquires = 1
          final Thread current = Thread.currentThread();
          int c = getState();
          /**
           * 不需要判断同步队列(CLH)中是否有排队等待线程
           * 判断state状态是否为0,为0可以加锁
           */
          if (c == 0) {
              //unsafe操作,cas修改state状态
              if (compareAndSetState(0, acquires)) {
                  //独占状态锁持有者指向当前线程
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          /**
           * state状态不为0,判断锁持有者是否是当前线程,
           * 如果是当前线程持有 则state+1
           */
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          //加锁失败
          return false;
      }
      
     AQS定义的方法,加锁 
     
      public final void acquire(int arg) {
          // tryAcquire 实际调用的子类方法
          if (!tryAcquire(arg) &&
                  // addWaiter 首先添加一个节点在队列中 添加到尾部
                  acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      
      
      addWaiter 添加节点:
      
          private Node addWaiter(Node mode) {
              // 1. 将当前线程构建成Node类型
              Node node = new Node(Thread.currentThread(), mode);
              // Try the fast path of enq; backup to full enq on failure
              Node pred = tail;
              // 2. 判断 当前尾节点是否为null?
              if (pred != null) {
                  // 2.2 将当前节点尾插入的方式,插入到尾部
                  // 将新创的结点的prev(前驱节点)指向原本的tail节点
                  node.prev = pred;
                  // 2.3 使用CAS将节点插入同步队列的尾部
                  if (compareAndSetTail(pred, node)) {
                      // 如果插入成功 把原本的tail的下一个节点指向 当前新建的结点 然后返回当前节点
                      pred.next = node;
                      return node;
                  }
              }
              // 把节点加入CLH同步队列 主要是 单前tail 是空的话 上面的逻辑没执行到,里面有个类似的结点指向操作
              enq(node);
              return node;
          }
          
          /**
           * 节点加入CLH同步队列
           */
          private Node enq(final Node node) {
              for (;;) {
                  Node t = tail;
                  if (t == null) { // Must initialize
                      //队列为空需要初始化,创建空的头节点
                      if (compareAndSetHead(new Node()))
                          tail = head;
                  } else {
                      // 队列中已经有值 尾节点不是空  把当前传进来的结点的 prev节点指向 当前tail节点
                      node.prev = t;
                      //set尾部节点
                      if (compareAndSetTail(t, node)) {//当前节点置为尾部
                          t.next = node; //前驱节点的next指针指向当前节点
                          return t;
                      }
                  }
              }
          }
      
      acquireQueued: 
      
           /**
           * 已经在队列当中的Thread节点,准备阻塞等待获取锁
           */
          final boolean acquireQueued(final Node node, int arg) {
              boolean failed = true;
              try {
                  boolean interrupted = false;
                  //死循环自旋
                  for (;;) {
                      //找到当前结点的前驱结点
                      final Node p = node.predecessor();
                      // 如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
                      if (p == head && tryAcquire(arg)) {
                          //获取同步状态成功,将当前结点设置为头结点。
                          setHead(node);
                          p.next = null; // help GC
                          failed = false;
                          return interrupted;
                      }
                      /**
                       * 如果前驱节点不是Head,通过shouldParkAfterFailedAcquire判断是否应该阻塞
                       * 前驱节点信号量为-1,当前线程可以安全被parkAndCheckInterrupt用来阻塞线程
                       */
                      if (shouldParkAfterFailedAcquire(p, node) &&
                              parkAndCheckInterrupt()) {
                          interrupted = true;
                      }
                  }
              } finally {
                  if (failed) {
                      cancelAcquire(node);
                  }
              }
          }
      
      parkAndCheckInterrupt:
      
           /**
           * 阻塞当前节点,返回当前Thread的中断状态
           * LockSupport.park 底层实现逻辑调用系统内核功能 pthread_mutex_lock 阻塞线程
           */
          private final boolean parkAndCheckInterrupt() {
              LockSupport.park(this);//阻塞
              return Thread.interrupted();
          }
      
      shouldParkAfterFailedAcquire: 
          
           private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
              //判断是否应该阻塞
              // 获取前驱节点等待状态
              int ws = pred.waitStatus;
              
              // 此状态是可以被唤醒的 可以去获取锁
              if (ws == Node.SIGNAL)
                  /*
                   * 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
                   */
                  return true;
              if (ws > 0) {
                  /* 状态是 1 被移除,并且继续检查其他节点,如果都是取消状态 一并移除
                   * 前驱节点状态如果被取消状态,将被移除出队列
                   */
                  do {
                      node.prev = pred = pred.prev;
                  } while (pred.waitStatus > 0);
                  pred.next = node;
              } else {
                  /* 同步队列不会出现 CONDITION 
                   * 所以 当前驱节点waitStatus为 0 or PROPAGATE(可传递状态)状态时
                   * 
                   * 将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park
                   */
                  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
              }
              return false;
          }
    

    ReentrantLock 图解

  • 相关阅读:
    MySQL高可用之MHA的搭建
    MySQL MGR 集群搭建(单主模式&多主模式)
    ansible-playbook定义变量与使用
    linux LVM逻辑卷管理
    Oracle 19C RAC 静默(silent)安装on RHEL7.x
    Python语言基础02-变量和运算
    Python之路,Day6
    Python 之路 Day5
    Python之路,Day4
    Python之路,Day3
  • 原文地址:https://www.cnblogs.com/ningjianwen/p/14130703.html
Copyright © 2011-2022 走看看