zoukankan      html  css  js  c++  java
  • 从ReentrantLock详解AQS原理源码解析

    Java中的大部分同步类(ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及FIFO队列模型的简单框架。

    AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

    数据结构


    在java.util.concurrent.locks.AbstractQueuedSynchronizer类中存在如下数据结构。

    // 链表结点
    static final class Node {}
    
    // head指向的是一个虚拟结点,刷多了算法就知道这样做的目的是方便对链表操作,真正的头为head.next
    private transient volatile Node head;
    
    // 尾结点
    private transient volatile Node tail;
    
    // 这个锁(共享资源)对象的状态。
    // volatile保证可见性和屏蔽指令重排
    private volatile int state;
    
    // 继承至AbstractOwnableSynchronizer类
    // 独占模式下当前锁的拥有者
    private transient Thread exclusiveOwnerThread;
    
    // 自旋锁的自旋纳秒数,用于提高应用的响应能力
    static final long spinForTimeoutThreshold = 1000L;
    
    // unsafe类
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    
    // 以下字段对应上面字段的在对象中的偏移值,在静态代码块中初始化,其值是相对于在这个类对象中的偏移量
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    

    在AQS类中的内部类Node包含如下数据结构

    static final class Node {
    
        // 共享锁
        static final Node SHARED = new Node();
    
        // 独占锁
        static final Node EXCLUSIVE = null;
           
        // 0	               当一个Node被初始化的时候的默认值
        // CANCELLED	为  1,表示线程获取锁的请求已经取消了
        // CONDITION	为 -2,表示节点在等待队列中,节点线程等待唤醒
        // PROPAGATE	为 -3,当前线程处在SHARED情况下,该字段才会使用
        // SIGNAL	        为 -1,表示线程已经准备好了,就等资源释放了
        volatile int waitStatus;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        
        // 前驱指针
        volatile Node prev;
        
        // 后继指针
        volatile Node next;
    	
        // 该节点代表的线程对象
        volatile Thread thread;
    
        Node nextWaiter;
    }
    

    从其数据结构可以猜测出

    • AQS类中主要的存储结构是一个双向链表,称为CLH变体的虚拟双向队列(FIFO)。
    • state字段对应了这个锁(共享资源)对象的状态。
    • 线程申请锁(共享资源)时会将其包装成一个节点。Node保存了获取锁的线程信息。
    • Node.waitStatus字段保存这个线程申请锁(共享资源)的状态。
    • head指向的是一个虚拟结点,真正有效的头为head.next。
    • 请求共享资源的线程包装节点node包含两种模式,Node.SHARED表示以共享的模式等待锁、Node.EXCLUSIVE表示正在以独占的方式等待锁。

    在前文锁阻塞和唤醒是用CLH队列锁实现的,CLH:Craig、Landin and Hagersten队列,是单向链表。通过分析上面的数据结构可知,在AQS中其实现本质上是一个双向链表,AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

    Node.waitStatus包含5个状态,对应如下

    状态 含义
    0 当一个Node被初始化的时候的默认值
    CANCELLED 为 1,表示线程获取锁的请求已经取消了
    CONDITION 为 -2,表示节点在等待队列中,节点线程等待唤醒
    PROPAGATE -3,当前线程处在SHARED情况下,该字段才会使用
    SIGNAL 为 -1,表示线程已经准备好了,就等资源释放了

    源码分析


    我们从AQS的实现类ReentrantLock#lock开始分析其具体的流程。

    ReentrantLock#lock

    public void lock() {
        sync.lock();
    }
    

    直接调用了Sync类的lock()方法,Sync类在ReentrantLock中有两个实现类分别是FairSync和NonfairSync,分别对应了公平锁和非公平锁。

    • 公平锁:线程获取锁的顺序和调用lock的顺序一样,FIFO;
    • 非公平锁:线程获取锁的顺序和调用lock的顺序无关,全凭运气。

    由于ReentrantLock默认是非公平锁,我们从NonfairSync类分析。

    ReentrantLock.NonfairSync#lock

    final void lock() {
    	// cas操作尝试将state字段值修改为1
        if (compareAndSetState(0, 1))
        	// 成功的话就代表已经获取到锁,修改独占模式下当前锁的拥有者为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
        else
        	// 获取锁失败之后的操作
            acquire(1);
    }
    

    从这可以确定我们之前的猜测

    • state字段对应了这个锁对象的状态,值为0的时候代表锁没有被线程占用,修改为1之后代表锁被占用。

    现在分析未获取到锁之后的流程

    AbstractQueuedSynchronizer#acquire

    public final void acquire(int arg) {
    	
        if (
        		// 当前线程尝试获取锁
        		!tryAcquire(arg) &&
        		// acquireQueued会把传入的结点在队列中不断去获取锁,直到获取成功或者不再需要获取(中断)。
            	acquireQueued(
            		// 在双向链表的尾部创建一个结点,值为当前线程和传入的模式
    	        	addWaiter(Node.EXCLUSIVE), 
    	        	arg
            	)
            )
            // TODO
            selfInterrupt();
    }
    

    看不懂,先查找资料了解这几个方法的作用,注释在代码中。

    ReentrantLock.NonfairSync#tryAcquire

    // 当前线程尝试获取锁
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    

    ReentrantLock.Sync#nonfairTryAcquire

    // 当前线程尝试获取锁-非公平
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获得当前锁对象的状态
        int c = getState();
        // state为0代表当前没有被线程占用
        if (c == 0) {
        	// cas操作尝试将state字段值修改为请求的数量
            if (compareAndSetState(0, acquires)) {
            	// 直接修改当前独占模式下锁的拥有者为为当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果锁的占有者就是当前线程
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // state值增加相应的请求数。
            setState(nextc);
            return true;
        }
        return false;
    }
    

    ReentrantLock字面意思是可重入锁

    • 可重入锁:一个线程在获取一个锁之后,在没有释放之前仍然可以继续申请锁而不会造成阻塞,但是解锁的时候也需要相应次数的解锁操作。

    结合nonfairTryAcquire方法逻辑,可以推断出state字段在独占锁模式下还代表了锁的重入次数。

    AbstractQueuedSynchronizer#addWaiter

    // 在链表尾部创建一个结点,值为当前线程和传入的模式
    private Node addWaiter(Node mode) {
    	// 创建一个结点,值为当前线程和传入的模式
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 快速路径,是为了方便JIT优化。jvm检测到热点代码,会将其编译成本地机器码并以各种手段进行代码优化。
        Node pred = tail;
        if (pred != null) {
        	// 将新创建的node的前驱指针指向tail。
            node.prev = pred;
            // 将结点修改为队列的tail时可能会发生数据冲突,用cas操作保证线程安全。
            if (compareAndSetTail(pred, node)) {
            	// compareAndSetTail比较的地址,如果相等则将新的地址赋给该字段(而不是在源地址上替换,为什么我会这么想???)
            	// 所以此处pred引用指向的仍然是源tail的内存地址。将其后继指针指向新的tail
                pred.next = node;
                return node;
            }
        }
        // 队列为空或者cas失败(说明被别的线程已经修改)
        enq(node);
        return node;
    }
    

    这个方法主要作用是在链表尾部创建一个结点,返回新创建的结点,其主要流程为

    • 通过当前的线程和锁模式创建一个节点。
    • 节点入尾操作
      • 新节点的前驱指针指向tail
      • 使用cas操作修改新节点为tail
      • 原tail的后继指针指向新节点

    当队列为空或者cas失败(说明被别的线程已经修改)会执行enq方法兜底。

    AbstractQueuedSynchronizer#enq

    // 在队列尾部创建一个结点,值为当前线程和传入的模式,当队列为空的时候初始化。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
            	// 创建一个空结点设置为头,真正的头为hdead.next
                if (compareAndSetHead(new Node()))
                	// 尾等于头
                    tail = head;
            } else {
            	// 这段逻辑跟addWaiter()中快速路径的逻辑一样。
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    addWaiter是对enq方法的一层封装,addWaiter首先尝试一个快速路径的在链表尾部创建一个结点,失败的时候回转入enq方法兜底,循环在链表尾部创建一个节点,直到成功为止。

    这里有个疑问,为什么要在addWaiter方法中尝试一次在enq方法中能完成的在链表尾部创建一个节点的操作呢?其实是为了方便JIT优化。jvm检测到热点代码,会将其编译成本地机器码并以各种手段进行代码优化。了解更多1了解更多2

    在链表尾插入需要

    AbstractQueuedSynchronizer#acquireQueued

    // acquireQueued会把传入的结点在队列中不断去获取锁,直到获取成功或者不再需要获取(中断)。
    final boolean acquireQueued(final Node node, int arg) {
    	// 标记是否成功拿到锁
        boolean failed = true;
        try {
        	// 标记获取锁的过程中是否中断过
            boolean interrupted = false;
            // 开始自旋,要么获取锁,要么中断
            for (;;) {
            	// 获得其前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点为head代表现在节点node在队列有效数据的第一位,就尝试获取锁
                if (p == head && tryAcquire(arg)) {
                	// 获取锁成功,把当前节点置为虚节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果存在以下情况就要判断当前node是否要被阻塞
                // 1. p为头节点且获取锁失败 2. p不为头结点
                if (shouldParkAfterFailedAcquire(p, node) &&
                	// 阻塞进程
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
            	// 取消申请锁
                cancelAcquire(node);
        }
    }
    

    AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

    // 依赖前驱节点判断当前线程是否应该被阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	// 入参请求锁的node的前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点的状态为"表示线程已经准备好了,就等资源释放了"
        // 说明前驱节点处于激活状态,入参node节点需要被阻塞
        if (ws == Node.SIGNAL)
            return true;
        // 只有CANCELLED状态对应大于0
        if (ws > 0) {
            do {
            	// 循环向前查找取消状态节点,把取消节点从队列中剔除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	// 设置状态非取消的前驱节点等待状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    ReentrantLock#lock总结

    到现在我们可以总结一下ReentrantLock#lock非公平锁方法的流程

    未获取到锁的情况下函数调用流程

    • ReentrantLock#lock
    • ReentrantLock.Sync#lock
    • ReentrantLock.NonfairSync#lock
    • AbstractQueuedSynchronizer#acquire
    • ReentrantLock.NonfairSync#tryAcquire
    • ReentrantLock.Sync#nonfairTryAcquire
    • AbstractQueuedSynchronizer#addWaiter
    • AbstractQueuedSynchronizer#acquireQueued

    描述

    • 执行ReentrantLock的Lock方法。
    • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,cas修改state值获取锁,失败执行父类的Acquire方法。
    • 父类的Acquire方法会执行子类实现的tryAcquire方法,因为tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
    • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
    
    
    // 公平锁加锁时判断等待队列中是否存在有效节点的方法。
    // 返回False,当前线程可以争取共享资源;
    // 返回True,队列中存在有效节点,当前线程必须加入到等待队列中。
    public final boolean hasQueuedPredecessors() {
    	Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // 头不等于尾代表队列中存在结点返回true
        // 但是还有一种特例,就是如果现在正在执行enq方法进行队列初始化,tail = head;语句运行之后
        // 此时h == t,返回false,但是队列中
        return h != t &&
        	// 从这可以看出真正的头结点是head.next,即说明head是一个无实际数据的结点,为了方便链表操作
            ((s = h.next) == null 
            // 有效头结点与当前线程不同,返回true必须加入到等待队列
            || s.thread != Thread.currentThread());
    }
    

    即时编译器


    Java程序最初都是通过解释器进行解释执行的,当虚拟机发现某个方法或代码块的运行特别频繁,就会把这些代码认定为“热点代码”(Hot Spot Code),为了提高热点代码的执行效率,在运行时,虚拟机将会把这些代码编译成本地机器码,并以各种手段尽可能地进行代码优化,运行时完成这个任务的后端编译器被称为即时编译器。
    这里所说的热点代码主要包括两类

    • 被多次调用的方法
    • 被多次执行的循环体

    对于这两种情况,编译的目标对象都是整个方法体,而不会是单独的循环体

    未完待续

  • 相关阅读:
    线性代数思维导图——3.向量
    微分中值定理的基础题型总结
    构造函数
    Python课程笔记(七)
    0241. Different Ways to Add Parentheses (M)
    0014. Longest Common Prefix (E)
    0013. Roman to Integer (E)
    0011. Container With Most Water (M)
    0010. Regular Expression Matching (H)
    0012. Integer to Roman (M)
  • 原文地址:https://www.cnblogs.com/neverth/p/13527005.html
Copyright © 2011-2022 走看看