zoukankan      html  css  js  c++  java
  • AQS源码分析

    概述

    当我们提到 juc 包下的锁,就不得不联系到 AbstractQueuedSynchronizer 这个类,这个类就是大名鼎鼎的 AQS,AQS 按字面意思翻译为抽象队列同步器,调用者可以通过继承该类快速的实现同步多线程下的同步容器。不管是我们熟悉的 ReadWriteLock 亦或是 ReentrantLock,或者 CountDownLatch 与 Semaphore,甚至是线程池类 ThreadPoolExecutor 都继承了 AQS。

    在本文,将深入源码,了解 AQS 的运行机制,了解通过 AQS 实现非公平锁,公平锁,可重入锁等的原理。

    一、AQS 中的数据结构

    AQS 的底层数据结构其实是一条双向链表以及一个代表锁状态的变量 state。当加锁后,state会改变,而竞争锁的线程会被封装到节点中形成链表,并且尝试改变 state以获取锁。

    1.等待队列

    在 AQS 中有一个 Node 内部类,该类即为链表的节点类。当通过 AQS 竞争锁的时候,线程会被封装到一个对应的节点中,多个竞争不到锁的线程最终会连成一条链表,这条链表上节点代表的线程处于等待状态,因此我们称之为等待队列,也就是 CLH

    节点类中封装了竞争锁的线程的等待状态:

    • CANCELLED:1,表示当前结点已取消等待。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
    • SIGNAL:-1,表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
    • CONDITION:-2,表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    • PROPAGATE:-3,共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
    • 0:新节点入队时的默认状态。

    和线程池中的状态一样,Node 只有小于 0 的时候才处于正常的等待状态中,因此很多地方通过判断是否小于 0 来确定节点是否处于等待状态

    static final class Node {
        
        static final Node SHARED = new Node();
        
        static final Node EXCLUSIVE = null;
        
        // 等待状态
        volatile int waitStatus;
        
        volatile Node prev;
    
        volatile Node next;
    
        // 等待线程
        volatile Thread thread;
    
        // 下一等待节点
        Node nextWaiter;
    }
    

    2.锁状态

    private volatile int state;
    

    AQS 中提供了 state变量做为锁状态,一般来说,0 被视为无锁状态,1 被视为加锁状态,如果是可重入锁,就会大于 1。

    因此,AQS 中的加锁解锁实际上就是通过 CAS 改变 state的过程,即下列三个方法:

    protected final int getState() {
        return state;
    }
    
    protected final void setState(int newState) {
        state = newState;
    }
    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    二、AQS 独占锁的加锁过程

    AQS 的同步过程其实就是同步队列节点中依次获取锁的过程。AQS 一共提供了独占和非独占两种获取资源的方法

    • acquire():以独占模式获取锁;
    • release():以独占模式释放锁;
    • acquireShared():以共享模式获取锁;
    • releaseShared():以共享模式释放锁;

    1.独占锁

    独占锁和非独占锁两者从流程上来说都差不多,只在一些实现上有区别。

    独占锁,顾名思义,即只有占有锁的线程才能操作资源,在 synchronize 底层的锁中,独占通过锁对象对象头中的指针来声明独占的线程,而在 AQS 中则通过父类 AbstractOwnableSynchronizer 提供的 exclusiveOwnerThread 变量来声明独占的线程:

    private transient Thread exclusiveOwnerThread;
    

    此外,AQS 并未提供其他具体实现。AQS 独占锁加锁的方法是 acquire(),其中涉及到 tryAcquire()方法是一个空实现,需要由子类实现并在在里面进行具体的独占判断:

    public final void acquire(int arg) {
        // 尝试获取锁
        if (!tryAcquire(arg) &&
            // 添加到等待队列
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 进入等待队列后阻塞
            selfInterrupt();
    }
    

    里面还涉及到 addWaiter(),acquireQueued()selfInterrupt()四个方法。

    2.获取锁资源

    在 AQS 中,tryAccquire() 是一个未实现的方法:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    他需要由具体的实现类去实现,并完成获取资源的功能。这里我们以用可重入锁 ReentrantLock 内为例(后文无声明亦同)。

    在 ReentrantLock 中,锁分为公平锁和非公平锁两种,二者的区别在于公平锁中等待队列中的线程严格按顺序获取锁,非公平锁中的线程可能不会按顺序获取锁。ReentrantLock 有一个内部类 Sync 继承了 AQS,提供基本的加锁解锁方法。

    然后分别有非公平锁 NonfairSync 类与公平锁类 FairSync 去继承 Sync,进一步区别公平锁与非公平的锁的实现逻辑。我们先看公平锁 FairSync 的tryAccquire()方法:

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 如果当前等待队列中没有线程在等待
            if (!hasQueuedPredecessors() &&
                // 尝试CAS修改state
                compareAndSetState(0, acquires)) {
                // 将当前锁设为自己独占
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果锁已经被自己获取过了,即重入
        else if (current == getExclusiveOwnerThread()) {
            // state + 1,即多获取一次锁
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 没有获取锁
        return false;
    }
    

    而非公平锁与公平锁的tryAccquire()主要差别在于,公平锁会先看看有没有线程在等待,没有才去竞争锁,而非公平锁不会看有没有线程在等待,无论如何都会先去竞争一次锁。

    其他锁的 tryAccquire()与 ReentrantLock 的大体相同。

    3.添加节点至等待队列

    addWaiter()方法用于创建并添加等待节点。

    private Node addWaiter(Node mode) {
        // 以共享或者独占模式创建节点
        Node node = new Node(Thread.currentThread(), mode);
        // 尾插法插入节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果队列为空
        enq(node);
        return node;
    }
    

    这里涉及到一个 enq()方法,这个方法不复杂,主要是自旋初始化 AQS 中的头结点和尾节点,值得注意的是,这里的头结点实际上是一个哨兵节点,本身并无意义,当等待队列排队获取资源的时候,会直接从 head.next 开始。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    4.在等待队列中获取锁

    这个方法主要做两件事:

    • 如果当前节点已经是队列第二个结点了,并且获取锁成功,就设置当前节点为新头结点,然后执行完毕后设置为中断;
    • 如果当前节点不是队列第二个节点,或者获取锁不成功,就挂起当前节点,等待上一节点的唤醒。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取当前节点的上一节点
                final Node p = node.predecessor();
                // 再次尝试获取锁,如果前驱节点已经是头节点了,或者获取资源成功
                if (p == head && tryAcquire(arg)) {
                    // 将当前节点设置为头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 前驱节点不是头结点或者获取锁失败
                // 如果前驱节点需要被 park 挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 挂起当前线程
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 旧头结点已经处理完了,直接删除
            if (failed)
                cancelAcquire(node);
        }
    }
    

    这里涉及到一个 shouldParkAfterFailedAcquire()方法:

    这个方法主要是根据前驱节点的状态判断当前节点是否需要被 park 的。如果这个方法返回 true,那么说明前驱节点被设置为 SIGNAL 状态,然后进入 parkAndCheckInterrupt()方法把当前线程挂起,等待前驱节点的唤醒。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前驱节点状态为SIGNAL
        if (ws == Node.SIGNAL)
            return true;
        // 如果前驱节点已经失效
        if (ws > 0) {
            // 移除全部失效节点,直到前驱节点为正常等待状态的节点为止
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 将前驱节点设置为SIGNAL,确保不影响后续节点的唤醒
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    如果上述shouldParkAfterFailedAcquire()返回 ture,那么就会接着执行 parkAndCheckInterrupt()方法挂起线程:

    private final boolean parkAndCheckInterrupt() {
        // 让当前线程等待,并中断任务
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    5.总结

    当线程使用 acquire()方法获取锁的时候:

    • 先执行tryAcquire()方法,这是个需要由子类实现的空方法,公平或者非公平在这个方法中让线程去获取锁,获得锁的线程要修改 state
    • 然后失败的线程需要执行addWaiter()方法,这个方法用于将线程封装到节点中,并以尾插法插入等待队列的链表,同时,如果等待队列没有初始化就会在此处先初始化;
    • 接着添加完成的节点执行acquireQueued()方法,此时会再次试图获取锁,如果此时还是失败,就会判断当前节点的前驱节点是否失效,如果不是就直接将前驱节点状态改为 SIGNAL ,然后执行 parkAndCheckInterrupt()方法挂起当前线程,如果是就一直找到一个正常等待的前驱节点为止,改前驱节点状态然后再挂起线程。

    三、AQS 独占锁的释放过程

    和 AQS 使用 acquire() 方法加锁的过程类似,AQS 也有一个 release()的解锁方法,他们同样需要实现类自己去实现 tryRelease()方法。

    public final boolean release(int arg) {
        // 尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            // 如果当前头节点为空且不为初始状态
            if (h != null && h.waitStatus != 0)
                // 唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    1.释放锁

    tryAcquire()一样,AQS 不提供 tryRelease()的具体实现,而是交由子类去实现它。

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    

    我们依然以可重入锁 ReentrantLock 为例,去了解 ReentrantLock 中 tryRelease()的实现。

    虽然 ReentrantLock 中有公平锁和非公平锁两种实现,但是他们是释放过程都是一样的,都通过他们的父类,即继承 AQS 的内部类 Sync 的 tryRelease()方法来实现释放的功能:

    protected final boolean tryRelease(int releases) {
        // 可重入锁,减去一次持锁次数
        int c = getState() - releases;
        // 如果当前线程不是持有锁的线程则抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 如果可重入次数为0,说明确实释放锁了
        if (c == 0) {
            free = true;
            // 独占线程设置为null
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    这个地方也很好理解,就是让 tryRelease()去执行释放锁的过程,换句话说,就是改变 state

    2.唤醒等待队列的后继节点

    unparkSuccessor()方法的主要用途是

    • 在前驱节点(其实就是等待队列的头结点)释放锁后,去唤醒等待队列中的后继节点;
    • 如果后继节点处于 CANCELLED 状态,说明该节点已经挂掉了,就从尾节点向前找到离后继节点最近的节点去唤醒,否则直接唤醒后继节点。
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            // 如果头节点状态还处于等待状态,则改回初始状态
            compareAndSetWaitStatus(node, ws, 0);
        
        Node s = node.next;
        // 如果后继节点存在并被标记为CANCELLED状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾节点开始,找到离node最近的处于等待状态的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒节点
            LockSupport.unpark(s.thread);
    }
    

    3.总结

    当线程使用 release()方法释放锁的时候:

    • 先执行tryRelease()方法释放资源,改变 state以释放锁
    • 再执行 unparkSuccessor()方法唤醒后继节点,如果后继节点挂了,就找到最近的下一个处于等待状态的有效节点唤醒。

    四、AQS 共享锁的加锁释放过程

    相对 AQS 独占锁,共享锁在 AQS 中以及提供好的相关的实现。共享锁通过 acquireShared()方法加锁,通过releaseShared()方法解锁。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    1.获取锁资源

    tryAcquireShared()也是一个空实现方法,需要由子类去实现。根据注释,我们不难理解它的作用:

    • 检查是否支持共享锁,如果是才能获取锁;
    • 根据后继等待节点的情况返回值:大于 0 说明有后继等待节点,执行完以后继续唤醒后继节点;等于 0 说明当前已是最后一个可以获取共享锁的节点,不再唤醒后继节点;小于 0 说明锁获取失败,需要进入等待队列。

    其中,针对共享锁,比较具有代表性的是读写锁 ReentrantReadWriteLock,它通过 state的高 16 位记录读锁,低 16 位记录写锁,在获取锁资源的时候,如果检测存在写锁则无法获得锁,如果是读锁则获取资源并递增读锁计数器,这部分的逻辑就是在其子类中得到的实现。

    2.唤醒后继节点

    基于上面的 tryAcquireShared()方法,doAcquireShared()要做的事情显然很明了了:

    • 如果后继节点可以以共享模式唤醒,就直接依次唤醒;
    • 否则,则跟获取独占锁的流程一样,再次尝试获取资源无果后将后将节点代表的线程挂起。
    private void doAcquireShared(int arg) {
        // 创建共享模式的节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点已经是头结点,即当前节点需要获取锁
                if (p == head) {
                    // 尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    // 唤醒后继节点
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                
                // 判断当前线程是否需要被挂起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    这里有一个 setHeadAndPropagate()方法,根据方法名可以猜出是用来设置头结点和唤醒后继共享节点的:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        // 设置头结点
        setHead(node);
        
        // 如果后续有需要唤醒的节点,并且当前节点没有被CANCELLED
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 如果下一节点处于共享状态
            if (s == null || s.isShared())
                // 释放共享锁
                doReleaseShared();
        }
    }
    

    这里其实只做了一些条件判断,确保有后继节点并且后继节点是正常节点,核心逻辑其实是 doReleaseShared()方法:

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         // 如果后续节点不需要唤醒,则设置为PROPAGATE避免影响后继节点的唤醒
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    3.解锁过程

    解锁使用的releaseShared()方法:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    这里的 tryReleaseShared()其实跟独占锁的tryRelease()类似,即改变状态以表示释放资源,而 doReleaseShared()即上文唤醒后继节点的方法。

    4.总结

    共享锁和独占锁的根本区别在于,当头是共享模式时,它被唤醒后会直接尝试唤醒后继所有共享模式的节点,直到遇到第一个非共享模式的节点为止,而不是跟独占锁一样只唤醒后继节点。

    五、总结

    AQS 在内部为此了一个变量 state,用于记录锁状态,线程通过 CAS 修改 state即是加锁解锁过程。

    AQS 内存维护了一条双向链表,即等待队列 CLH,等待锁的线程被封装为 Node 节点连成链表,通过 LockSuppor 工具类的 park()unpark()方法切换等待状态。

    AQS 提供了独占和非独占两种锁实现方式,分别提供了 acquire()/release()acquireShared()/releaseShared()两套加锁解锁方式,同时,基于 state有衍生出可重入和非可重入锁的实现——即重入锁在state=1的情况下继续递增,解锁在 state上递减直到为 0 为止。并且,根据是否先判断等待队列中是否已存在等待线程,然后再尝试获取锁的情况,又分出了公平锁和非公平锁两种实现。

  • 相关阅读:
    Android手势(上,下,左和右的判断)
    我爱意甲
    程序员特有的9个坏习惯
    我爱英超
    VS2010快捷键总结(一)
    C#中导出Excel总结
    MessageDAL
    GDI+ 绘图总结
    .net中绑定日期时,只显示年月日的做法
    Vb线程控制
  • 原文地址:https://www.cnblogs.com/Createsequence/p/14406966.html
Copyright © 2011-2022 走看看