zoukankan      html  css  js  c++  java
  • 并发编程之:AQS源码解析

    大家好,我是小黑,一个在互联网苟且偷生的农民工。

    在Java并发编程中,经常会用到锁,除了Synchronized这个JDK关键字以外,还有Lock接口下面的各种锁实现,如重入锁ReentrantLock,还有读写锁ReadWriteLock等,他们在实现锁的过程中都是依赖与AQS来完成核心的加解锁逻辑的。那么AQS具体是什么呢?

    提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)。 该类被设计为大多数类型的同步器的有用依据,这些同步器依赖于单个原子int值来表示状态。 子类必须定义改变此状态的受保护方法,以及根据该对象被获取或释放来定义该状态的含义。 给定这些,这个类中的其他方法执行所有排队和阻塞机制。 子类可以保持其他状态字段,但只以原子方式更新int使用方法操纵值getState() , setState(int)和compareAndSetState(int, int)被跟踪相对于同步。

    上述内容来自JDK官方文档。

    简单来说,AQS是一个先进先出(FIFO)的等待队列,主要用在一些线程同步场景,需要通过一个int类型的值来表示同步状态。提供了排队和阻塞机制。

    类图结构

    image-20210904200925904

    从类图可以看出,在ReentrantLock中定义了AQS的子类Sync,可以通过Sync实现对于可重入锁的加锁,解锁。

    AQS通过int类型的状态state来表示同步状态。

    AQS中主要提供的方法:

    acquire(int) 独占方式获取锁

    acquireShared(int) 共享方式获取锁

    release(int) 独占方式释放锁

    releaseShared(int) 共享方式释放锁

    独占锁和共享锁

    关于独占锁和共享锁先给大家普及一下这个概念。

    独占锁指该锁只能同时被一个线程持有;

    共享锁指该锁可以被多个线程同时持有。

    举个生活中的例子,比如我们使用打车软件打车,独占锁就好比我们打快车或者专车,一辆车只能让一个客户打到,不能两个客户同时打到一辆车;共享锁就好比打拼车,可以有多个客户一起打到同一辆车。

    AQS内部结构

    我们简单通过一张图先来了解下AQS的内部结构。其实就是有一个队列,这个队列的头结点head代表当前正在持有锁的线程,后续的其他节点代表当前正在等待的线程。

    image-20210904201020029


    接下来我们通过源码来看看AQS的加锁和解锁过程。先来看看独占锁是如何进行加解锁的。

    独占锁加锁过程

    ReentrantLock lock = new ReentrantLock();
    lock.lock();
    
    public void lock() {
        // 调用sync的lock方法
        sync.lock();
    }
    

    可以看到在ReentrantLock的lock方法中,直接调用了sync这个AQS子类的lock方法。

    final void lock() {
        // 获取锁
        acquire(1);
    }
    public final void acquire(int arg) {
        // 1.先尝试获取,如果获取成功,则直接返回,代表加锁成功
        if (!tryAcquire(arg) &&
            // 2.如果获取失败,则调用addWaiter在等待队列中增加一个节点
            // 3. 调用acquireQueued告诉前一个节点,在解锁之后唤醒自己,然后线程进入等待状态
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 如果在等待过程中被中断,则当前线程中断
            selfInterrupt();
    }
    

    在获取锁时,基本可以分为3步:

    1. 尝试获取,如果成功则返回,如果失败,执行下一步;
    2. 将当前线程放入等待队列尾部;
    3. 标记前面等待的线程执行完之后唤醒当前线程。
    /**
     * 尝试获取锁(公平锁实现)
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
    	// 获取state,初始值为0,每次加锁成功会+1,解锁成功-1
        int c = getState();
        // 当前没有线程占用
        if (c == 0) { 
            // 判断是否有其他线程排队在本线程之前
            if (!hasQueuedPredecessors() &&
                // 如果没有,通过CAS进行加锁
                compareAndSetState(0, acquires)) {
                // 将当前线程设置为AQS的独占线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前线程是正在独占的线程(已持有锁,重入)
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;  
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            // state+1
            setState(nextc);
            return true;
        }
        return false;
    }
    
    private Node addWaiter(Node mode) {
        // 创建一个当前线程的Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 如果等待队列的尾节点!=null
        if (pred != null) {
            // 将本线程对应节点的前置节点设置为原来的尾节点
            node.prev = pred;
            // 通过CAS将本线程节点设置为尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //尾节点为空,或者在CAS时失败,则通过enq方法重新加入到尾部。(本方法内部采用自旋)
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 尾节点为空,代表等待队列还没有被初始化过
            if (t == null) { 
                // 创建一个空的Node对象,通过CAS赋值给Head节点,如果失败,则重新自旋一次,如果成功,将Head节点赋值给尾节点
                if (compareAndSetHead(new Node()))
                    tail = head; 
            } else {
                // 尾节点不为空的情况,说明等待队列已经被初始化过,将当前节点的前置节点指向尾节点
                node.prev = t;
                // 将当前节点CAS赋值给尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        // 标识是否加锁失败
        boolean failed = true;
        try {
            // 是否被中断
            boolean interrupted = false;
            for (;;) {
                // 取出来当前节点的前一个节点
                final Node p = node.predecessor();
                // 如果前一个节点是head节点,那么自己就是老二,这个时候再尝试获取一次锁
                if (p == head && tryAcquire(arg)) {
                    // 如果获取成功,把当前节点设置为head节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false; // 标识加锁成功
                    return interrupted;
                }
                // shouldParkAfterFailedAcquire 检查并更新前置节点p的状态,如果node节点应该阻塞就返回true
                // 如果返回false,则自旋一次。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 当前线程阻塞,在阻塞被唤醒时,判断是否被中断
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) // 如果加锁成功,则取消获取锁
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // ws == -1
            /*
    		 * 这个节点已经设置了请求释放的状态,所以它可以在这里安全park.
             */
            return true;
        if (ws > 0) {
            /*
             * 前置节点被取消了,跳过前置节点重试
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 将前置节点的状态设置为请求释放
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    在整个加锁过程可以通过下图更清晰的理解。

    image-20210904201038010

    独占锁解锁过程

    public void unlock() {
        sync.release(1);
    }
    

    同样解锁时也是直接调用AQS子类sync的release方法。

    public final boolean release(int arg) {
        // 尝试解锁
        if (tryRelease(arg)) {
            Node h = head;
            // 解锁成功,如果head!=null并且head.ws不等0,代表有其他线程排队
            if (h != null && h.waitStatus != 0)
                // 唤醒后续等待的节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    解锁过程如下:

    1. 先尝试解锁,解锁失败则直接返回false。(理论上不会解锁失败,因为正在执行解锁的线程一定是持有锁的线程)
    2. 解锁成功之后,如果有head节点并且状态不是0,代表有线程被阻塞等待,则唤醒下一个等待的线程。
    protected final boolean tryRelease(int releases) {
        // state - 1
        int c = getState() - releases;
        // 如果当前线程不是独占AQS的线程,但是这时候又来解锁,这种情况肯定是非法的。
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 如果状态归零,代表锁释放了,将独占线程设置为null
            free = true;
            setExclusiveOwnerThread(null);
        }
    	// 将减1之后的状态设置为state
        setState(c);
        return free;
    }
    
    private void unparkSuccessor(Node node) {
        /*
         * 如果节点的ws小于0,将ws设置为0
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * 从等待队列的尾部往前找,直到第二个节点,ws<=0的节点。
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果存在符合条件的节点,unpark唤醒这个节点的线程。
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    共享锁加锁过程

    为了实现共享锁,AQS中专门有一套和排他锁不同的实现,我们来看一下源码具体是怎么做的。

    public void lock() {
        sync.acquireShared(1);
    }
    
    public final void acquireShared(int arg) {
        // tryAcquireShared 尝试获取共享锁许可,如果返回负数标识获取失败
        // 返回0表示成功,但是已经没有多余的许可可用,后续不能再成功,返回正数表示后续请求也可以成功
        if (tryAcquireShared(arg) < 0)
           //  申请失败,则加入到共享等待队列
            doAcquireShared(arg);
    }
    

    tryAcquireShared尝试获取共享许可,本方法需要在子类中进行实现。不同的实现类实现方式不一样。

    下面的代码是ReentrentReadWriteLock中的实现。

     protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 当前有独占线程正在持有许可,并且独占线程不是当前线程,则返回失败(-1)
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        // 没有独占线程,或者独占线程是当前线程。
        // 获取已使用读锁的个数
        int r = sharedCount(c);
      	// 判断当前读锁是否应该阻塞 
        if (!readerShouldBlock() &&
            // 已使用读锁小于最大数量
            r < MAX_COUNT &&
            // CAS设置state,每次加SHARED_UNIT标识共享锁+1
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) { // 标识第一次加读锁
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 重入加读锁
                firstReaderHoldCount++;
            } else {
                // 并发加读锁,记录当前线程的读的次数,HoldCounter中是一个ThreadLocal。
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return 1;
        }
        // 否则自旋尝试获取共享锁
        return fullTryAcquireShared(current);
    }
    

    本方法可以总结为三步:

    1. 如果有写线程独占,则失败,返回-1
    2. 没有写线程或者当前线程就是写线程重入,则判断是否读线程阻塞,如果不用阻塞则CAS将已使用读锁个数+1
    3. 如果第2步失败,失败原因可能是读线程应该阻塞,或者读锁达到上限,或者CAS失败,则调用fullTryAcquireShared方法。
    private void doAcquireShared(int arg) {
        // 加入同步等待队列,指定是SHARED类型
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 取到当前节点的前一个节点
                final Node p = node.predecessor();
                // 如果前一个节点是头节点,则当前节点是第二个节点。
                if (p == head) {
                    // 因为是FIFO队列,所以当前节点这时可以再尝试获取一次。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取成功,把当前节点设置为头节点。并且判断是否需要唤醒后面的等待节点。
                        // 如果条件允许,就会唤醒后面的节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 如果前置节点不是头结点,说明当前节点线程需要阻塞等待,并告知前一个节点唤醒
                // 检查并更新前置节点p的状态,如果node节点应该阻塞就返回true
                // 当前线程被唤醒之后,会从parkAndCheckInterrupt()执行
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) 
                cancelAcquire(node);
        }
    }
    

    共享锁释放过程

    public void unlock() {
        sync.releaseShared(1);
    }
    
    public final boolean releaseShared(int arg) {
        //tryReleaseShared()尝试释放许可,这个方法在AQS中默认抛出一个异常,需要在子类中实现
        if (tryReleaseShared(arg)) {
            // 唤醒线程,设置传播状态 WS
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    AQS是很多并发场景下同步控制的基石,其中的实现相对要复杂很多,还需要多看多琢磨才能完全理解。本文也是和大家做一个初探,给大家展示了核心的代码逻辑,希望能有所帮助。


    好的,本期内容就到这里,我们下期见;关注公众号【小黑说Java】更多干货。
    image

  • 相关阅读:
    正则表达式
    字节流和字符流小练习
    File汇总
    java一不容易就容易错的知识点汇总
    a++和++a区别
    线程安全的3种方式
    bs4和css选择器的基本使用
    清点作业情况
    cookie和session的使用
    用post请求制作翻译
  • 原文地址:https://www.cnblogs.com/heiz123/p/15227652.html
Copyright © 2011-2022 走看看