zoukankan      html  css  js  c++  java
  • 从ReentrantLock详解AQS原理源码解析

    Java中的大部分同步类(ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及FIFO队列模型的简单框架。

    AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

    数据结构


    在java.util.concurrent.locks.AbstractQueuedSynchronizer类中存在如下数据结构。

    // 链表结点
    static final class Node {}
    
    // head指向的是一个虚拟结点,刷多了算法就知道这样做的目的是方便对链表操作,真正的头为head.next
    private transient volatile Node head;
    
    // 尾结点
    private transient volatile Node tail;
    
    // 这个锁(共享资源)对象的状态。
    // volatile保证可见性和屏蔽指令重排
    private volatile int state;
    
    // 继承至AbstractOwnableSynchronizer类
    // 独占模式下当前锁的拥有者
    private transient Thread exclusiveOwnerThread;
    
    // 自旋锁的自旋纳秒数,用于提高应用的响应能力
    static final long spinForTimeoutThreshold = 1000L;
    
    // unsafe类
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    // 以下字段对应上面字段的在对象中的偏移值,在静态代码块中初始化,其值是相对于在这个类对象中的偏移量
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    

    在AQS类中的内部类Node包含如下数据结构

    static final class Node {
    
        // 共享锁
        static final Node SHARED = new Node();
    
        // 独占锁
        static final Node EXCLUSIVE = null;
           
        // 0	               当一个Node被初始化的时候的默认值
        // CANCELLED	为  1,表示线程获取锁的请求已经取消了
        // CONDITION	为 -2,表示节点在等待队列中,节点线程等待唤醒
        // PROPAGATE	为 -3,当前线程处在SHARED情况下,该字段才会使用
        // SIGNAL	        为 -1,表示线程已经准备好了,就等资源释放了
        volatile int waitStatus;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        
        // 前驱指针
        volatile Node prev;
        
        // 后继指针
        volatile Node next;
    	
        // 该节点代表的线程对象
        volatile Thread thread;
    
        Node nextWaiter;
    }
    

    从其数据结构可以猜测出

    • AQS类中主要的存储结构是一个双向链表,称为CLH变体的虚拟双向队列(FIFO)。
    • state字段对应了这个锁(共享资源)对象的状态。
    • 线程申请锁(共享资源)时会将其包装成一个节点。Node保存了获取锁的线程信息。
    • Node.waitStatus字段保存这个线程申请锁(共享资源)的状态。
    • head指向的是一个虚拟结点,真正有效的头为head.next。
    • 请求共享资源的线程包装节点node包含两种模式,Node.SHARED表示以共享的模式等待锁、Node.EXCLUSIVE表示正在以独占的方式等待锁。

    在前文锁阻塞和唤醒是用CLH队列锁实现的,CLH:Craig、Landin and Hagersten队列,是单向链表。通过分析上面的数据结构可知,在AQS中其实现本质上是一个双向链表,AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

    Node.waitStatus包含5个状态,对应如下

    状态 含义
    0 当一个Node被初始化的时候的默认值
    CANCELLED 为 1,表示线程获取锁的请求已经取消了
    CONDITION 为 -2,表示节点在等待队列中,节点线程等待唤醒
    PROPAGATE -3,当前线程处在SHARED情况下,该字段才会使用
    SIGNAL 为 -1,表示线程已经准备好了,就等资源释放了

    源码分析


    我们从AQS的实现类ReentrantLock#lock开始分析其具体的流程。

    ReentrantLock#lock

    public void lock() {
        sync.lock();
    }
    

    直接调用了Sync类的lock()方法,Sync类在ReentrantLock中有两个实现类分别是FairSync和NonfairSync,分别对应了公平锁和非公平锁。

    • 公平锁:线程获取锁的顺序和调用lock的顺序一样,FIFO;
    • 非公平锁:线程获取锁的顺序和调用lock的顺序无关,全凭运气。

    由于ReentrantLock默认是非公平锁,我们从NonfairSync类分析。

    ReentrantLock.NonfairSync#lock

    final void lock() {
    	// cas操作尝试将state字段值修改为1
        if (compareAndSetState(0, 1))
        	// 成功的话就代表已经获取到锁,修改独占模式下当前锁的拥有者为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
        	// 获取锁失败之后的操作
            acquire(1);
    }
    

    从这可以确定我们之前的猜测

    • state字段对应了这个锁对象的状态,值为0的时候代表锁没有被线程占用,修改为1之后代表锁被占用。

    现在分析未获取到锁之后的流程

    AbstractQueuedSynchronizer#acquire

    public final void acquire(int arg) {
    	
        if (
        		// 当前线程尝试获取锁
        		!tryAcquire(arg) &&
        		// acquireQueued会把传入的结点在队列中不断去获取锁,直到获取成功或者不再需要获取(中断)。
            	acquireQueued(
            		// 在双向链表的尾部创建一个结点,值为当前线程和传入的模式
    	        	addWaiter(Node.EXCLUSIVE), 
    	        	arg
            	)
            )
            // TODO
            selfInterrupt();
    }
    

    看不懂,先查找资料了解这几个方法的作用,注释在代码中。

    ReentrantLock.NonfairSync#tryAcquire

    // 当前线程尝试获取锁
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    

    ReentrantLock.Sync#nonfairTryAcquire

    // 当前线程尝试获取锁-非公平
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获得当前锁对象的状态
        int c = getState();
        // state为0代表当前没有被线程占用
        if (c == 0) {
        	// cas操作尝试将state字段值修改为请求的数量
            if (compareAndSetState(0, acquires)) {
            	// 直接修改当前独占模式下锁的拥有者为为当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果锁的占有者就是当前线程
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // state值增加相应的请求数。
            setState(nextc);
            return true;
        }
        return false;
    }
    

    ReentrantLock字面意思是可重入锁

    • 可重入锁:一个线程在获取一个锁之后,在没有释放之前仍然可以继续申请锁而不会造成阻塞,但是解锁的时候也需要相应次数的解锁操作。

    结合nonfairTryAcquire方法逻辑,可以推断出state字段在独占锁模式下还代表了锁的重入次数。

    AbstractQueuedSynchronizer#addWaiter

    // 在链表尾部创建一个结点,值为当前线程和传入的模式
    private Node addWaiter(Node mode) {
    	// 创建一个结点,值为当前线程和传入的模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 快速路径,是为了方便JIT优化。jvm检测到热点代码,会将其编译成本地机器码并以各种手段进行代码优化。
        Node pred = tail;
        if (pred != null) {
        	// 将新创建的node的前驱指针指向tail。
            node.prev = pred;
            // 将结点修改为队列的tail时可能会发生数据冲突,用cas操作保证线程安全。
            if (compareAndSetTail(pred, node)) {
            	// compareAndSetTail比较的地址,如果相等则将新的地址赋给该字段(而不是在源地址上替换,为什么我会这么想???)
            	// 所以此处pred引用指向的仍然是源tail的内存地址。将其后继指针指向新的tail
                pred.next = node;
                return node;
            }
        }
        // 队列为空或者cas失败(说明被别的线程已经修改)
        enq(node);
        return node;
    }
    

    这个方法主要作用是在链表尾部创建一个结点,返回新创建的结点,其主要流程为

    • 通过当前的线程和锁模式创建一个节点。
    • 节点入尾操作
      • 新节点的前驱指针指向tail
      • 使用cas操作修改新节点为tail
      • 原tail的后继指针指向新节点

    当队列为空或者cas失败(说明被别的线程已经修改)会执行enq方法兜底。

    AbstractQueuedSynchronizer#enq

    // 在队列尾部创建一个结点,值为当前线程和传入的模式,当队列为空的时候初始化。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
            	// 创建一个空结点设置为头,真正的头为hdead.next
                if (compareAndSetHead(new Node()))
                	// 尾等于头
                    tail = head;
            } else {
            	// 这段逻辑跟addWaiter()中快速路径的逻辑一样。
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    addWaiter是对enq方法的一层封装,addWaiter首先尝试一个快速路径的在链表尾部创建一个结点,失败的时候回转入enq方法兜底,循环在链表尾部创建一个节点,直到成功为止。

    这里有个疑问,为什么要在addWaiter方法中尝试一次在enq方法中能完成的在链表尾部创建一个节点的操作呢?其实是为了方便JIT优化。jvm检测到热点代码,会将其编译成本地机器码并以各种手段进行代码优化。了解更多1了解更多2

    在链表尾插入需要

    AbstractQueuedSynchronizer#acquireQueued

    // acquireQueued会把传入的结点在队列中不断去获取锁,直到获取成功或者不再需要获取(中断)。
    final boolean acquireQueued(final Node node, int arg) {
    	// 标记是否成功拿到锁
        boolean failed = true;
        try {
        	// 标记获取锁的过程中是否中断过
            boolean interrupted = false;
            // 开始自旋,要么获取锁,要么中断
            for (;;) {
            	// 获得其前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点为head代表现在节点node在队列有效数据的第一位,就尝试获取锁
                if (p == head && tryAcquire(arg)) {
                	// 获取锁成功,把当前节点置为虚节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果存在以下情况就要判断当前node是否要被阻塞
                // 1. p为头节点且获取锁失败 2. p不为头结点
                if (shouldParkAfterFailedAcquire(p, node) &&
                	// 阻塞进程
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
            	// 取消申请锁
                cancelAcquire(node);
        }
    }
    

    AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

    // 依赖前驱节点判断当前线程是否应该被阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// 入参请求锁的node的前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点的状态为"表示线程已经准备好了,就等资源释放了"
        // 说明前驱节点处于激活状态,入参node节点需要被阻塞
        if (ws == Node.SIGNAL)
            return true;
        // 只有CANCELLED状态对应大于0
        if (ws > 0) {
            do {
            	// 循环向前查找取消状态节点,把取消节点从队列中剔除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	// 设置状态非取消的前驱节点等待状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    ReentrantLock#lock总结

    到现在我们可以总结一下ReentrantLock#lock非公平锁方法的流程

    未获取到锁的情况下函数调用流程

    • ReentrantLock#lock
    • ReentrantLock.Sync#lock
    • ReentrantLock.NonfairSync#lock
    • AbstractQueuedSynchronizer#acquire
    • ReentrantLock.NonfairSync#tryAcquire
    • ReentrantLock.Sync#nonfairTryAcquire
    • AbstractQueuedSynchronizer#addWaiter
    • AbstractQueuedSynchronizer#acquireQueued

    描述

    • 执行ReentrantLock的Lock方法。
    • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,cas修改state值获取锁,失败执行父类的Acquire方法。
    • 父类的Acquire方法会执行子类实现的tryAcquire方法,因为tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
    • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
    
    
    // 公平锁加锁时判断等待队列中是否存在有效节点的方法。
    // 返回False,当前线程可以争取共享资源;
    // 返回True,队列中存在有效节点,当前线程必须加入到等待队列中。
    public final boolean hasQueuedPredecessors() {
    	Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // 头不等于尾代表队列中存在结点返回true
        // 但是还有一种特例,就是如果现在正在执行enq方法进行队列初始化,tail = head;语句运行之后
        // 此时h == t,返回false,但是队列中
        return h != t &&
        	// 从这可以看出真正的头结点是head.next,即说明head是一个无实际数据的结点,为了方便链表操作
            ((s = h.next) == null 
            // 有效头结点与当前线程不同,返回true必须加入到等待队列
            || s.thread != Thread.currentThread());
    }
    

    即时编译器


    Java程序最初都是通过解释器进行解释执行的,当虚拟机发现某个方法或代码块的运行特别频繁,就会把这些代码认定为“热点代码”(Hot Spot Code),为了提高热点代码的执行效率,在运行时,虚拟机将会把这些代码编译成本地机器码,并以各种手段尽可能地进行代码优化,运行时完成这个任务的后端编译器被称为即时编译器。
    这里所说的热点代码主要包括两类

    • 被多次调用的方法
    • 被多次执行的循环体

    对于这两种情况,编译的目标对象都是整个方法体,而不会是单独的循环体

    未完待续

  • 相关阅读:
    ng4中碰到的问题以及原因
    微信小程序安卓固定弹窗中textarea的placeholder会被弹出去
    微信小程序movable-view移动图片和双指缩放
    微信小程序滑动删除(真机测试)
    C语言编程100例JavaScript版(0~20)
    spring boot上传图片至七牛云服务器做存储
    spring boot 打包部署到tomcat上
    Uncaught SyntaxError: Unexpected token <
    在eclipse里新建一个maven工程,使用spring boot框架
    将一个数组展为树形结构的数据并将其展示在页面上
  • 原文地址:https://www.cnblogs.com/neverth/p/13527005.html
Copyright © 2011-2022 走看看