zoukankan      html  css  js  c++  java
  • 【JDK1.8】JUC——AbstractQueuedSynchronizer

    一、前言

    上一篇中,我们对LockSupport进行了阅读,因为它是实现我们今天要分析的AbstractQueuedSynchronizer(简称AQS)的基础,重新用一下最开始的图:

    juc

    可以看到,在ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock中都用到了继承自AQS的Sync内部类,正如AQS的java doc中一开始描述:

    Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

    为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)提供框架。

    AQS根据模式的不同:独占(EXCLUSIVE)和共享(SHARED)模式。

    • 独占:只有一个线程能执行。如ReentrantLock。
    • 共享:多个线程可同时执行。如Semaphore,可以设置指定数量的线程共享资源。

    对应的类根据不同的模式,来实现对应的方法。


    二、结构概览

    试想一下锁的应用场景,当线程试图请求资源的时候,先调用lock,如果获得锁,则得以继续执行,而没有获得,则排队阻塞,直到锁被其他线程释放,听起来就像是一个列队的结构。而实际上AQS底层就是一个先进先出的等待队列

    队列采用了链表的结构,node作为基本结构,主要有以下几个成员变量:

    static final class Node {
        //用来表明当前节点的等待状态,主要有下面几个:
        // CANCELLED: 1, 表示当前的线程被取消
        // SIGNAL: -1, 表示后继节点需要运行,也就是unpark
        // CONDITION: -2, 表示线程在等待condition
        // PROPAGATE: -3, 表示后续的acquireShared能够得以执行,在共享模式中用到,后面会说
        // 0, 初始状态,在队列中等待
        volatile int waitStatus;
        // 指向前一个node
        volatile Node prev;
        // 指向后一个node
        volatile Node next;
        // 指向等待的那个线程
        volatile Thread thread;
        // 在condition中用到
        Node nextWaiter;
    }
    

    在AQS中,用head,tail来记录了队列的头和尾,方便快速操作队列:

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        private transient volatile Node head;
        private transient volatile Node tail;
        // 同步状态
        private volatile int state;
    }
    

    AQS的基本框架就是:state作为同步资源状态,当线程请求锁的时候,根据state数值判断能否获得锁。不能,则加入队列中等待。当持有锁的线程释放的时候,根据队列里的顺序来决定谁先获得锁。


    三、源码阅读

    独占模式典型的实现就是ReentrantLock,其具体流程如下:

    独占模式下对应的lock-unlock就是acquire-release。整个过程如上图所示。我们先来看一下acquire方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    1. 调用tryAcquire(),该方法会在独占模式下尝试请求获取对象状态。具体的实现由实现类去决定。
    2. 如果tryAcquire()失败,即返回false,则调用addWaiter函数,将当前线程标记为独占模式,加入队列的尾部。
    3. 调用acquireQueued(),让线程在队列中等待获取资源,一直获取到资源后才返回。如果在等阿迪过程中被中断过,则返回true,否则返回false
    4. 如果线程被中断过,在获取锁之后,调用中断

    3.1 tryAcquire(int arg)

    下面来具体看一下各个方法:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    前面说过了,AQS提供的是框架,其具体的实现由实现类来完成,tryAcquire就是其中之一,需要子类自己实现的方法,那既然要自己实现,为什么不加abstract关键字,因为前面提到过,只有独占模式的实现类才需要实现这个方法,像Semaphore,CountDownLatch等共享模式的类不需要用到这个方法。如果加了关键字,那么这些类还要实现,显得很鸡肋。


    3.2 addWaiter(Node mode)

    private Node addWaiter(Node mode) {
        // 将当前线程封装进node
        Node node = new Node(Thread.currentThread(), mode);
        
        Node pred = tail;
        // 插入队列尾部,并维持节点前后关系
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 上一步如果失败,在enq中继续处理
        enq(node);
        return node;
    }
    

    逻辑相对简单,其中compareAndSetTail采用Unsafe类来实现。那么下面的enq()方法是具体做了什么呢?

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 队列初始化
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            // 重复执行插入直到return
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    enq()方法为了防止在addWaiter中,节点插入队列失败没有return,或者队列没有初始化,在for循环中反复执行,确保插入成功,返回节点。


    3.3 acquireQueued(final Node node, int arg)

    到目前为止,走到acquireQueued()调用了前两个方法,意味着获取资源失败,将节点加入了等待队列,那么下面要做的就是阻塞当前的线程,等待资源被是否后,再次唤醒线程来取得资源。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取当前节点的前一个节点
                final Node p = node.predecessor();
                // 前一个节点是头结点,且获取到了资源
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 不符合上面的条件,那么只能被park,等待被唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    acquireQueued当中,用for循环来让线程等待,直至获得资源return。而return的条件就是当前的节点是第二个节点,且头结点已经释放了资源。

    再来看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法

    先来说一下parkAndCheckInterrupt:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    调用LockSupport.park,阻塞当前线程,当线程被重新唤醒后,返回是否被中断过。

    再来重点看一下shouldParkAfterFailedAcquire:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前一个节点的状态
        int ws = pred.waitStatus;
        // 如果前一个节点的状态是signal,前面提到表明会unpark下一个节点,则true
        if (ws == Node.SIGNAL)
            return true;
        // 如果ws > 0 即CANCELLED,则向前找,直到找到正常状态的节点。
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 维护正常状态
            pred.next = node;
        // 将前一个节点设置为SIGNAL
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    shouldParkAfterFailedAcquire的主要作用就是将node放置在SIGNAL状态的前节点下,确保能被唤醒,在调用该方法后,CANCELLED状态的节点因为没有引用执行它将被GC。

    那么问题来了,什么时候节点会被设置为CANCELLED状态

    答案就在try-finally的cancelAcquire(node)当中。当在acquireQueued取锁的过程中,抛出了异常,则会调用cancelAcquire。将当前节点的状态设置为CANCELLED。


    3.4 cancelAcquire(Node node)

    我们先来看一下它的源码:

    private void cancelAcquire(Node node) {
        // node为空,啥都不干
        if (node == null)
            return;
        node.thread = null;
    
        // while查找,直到找到非CANCELLED的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        // 获取非CANCELLED的节点的下一个节点,predNext肯定是CANCELLED
        Node predNext = pred.next;
    
        // 设置当前节点为CANCELLED状态
        node.waitStatus = Node.CANCELLED;
    
        // 如果节点在队列尾部,直接移除自己就可以了
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            // 重新维护剩下的链表关系
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 唤醒node的下一个节点
                unparkSuccessor(node);
            }
            // help GC
            node.next = node; 
        }
    }
    

    总结来说,cancelAcquire就是用来维护链表正常状态的关系,直接看代码认识起来可能还比较模糊,放图:

    几个注意点:

    1. 如果node为第二个节点的时候,pred == head,唤醒下一个节点next_node,next_node线程会继续在acquireQueued的for循环中执行,调用shouldParkAfterFailedAcquire会重新维护状态,排除node节点
    2. 调用if里的逻辑后,可以看到next的prev还指向node,会导致node无法被gc,这一点不用担心,当next调用setHead被设置为head的时候,next的prev会被设置为null,这样node就会被gc
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    

    以上部分就是acquire的所有部分,建议忘记的园友们可以回到上面重新看一下流程图,再接着稳固一遍。


    3.5 release(int arg)

    下面开始release的源码解析,相对于acquire来说要简单一些:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    与acquire一样,tryRelease由实现类自己实现,如果为true,则unpark队列头部的下一个节点。

    private void unparkSuccessor(Node node) {
        // 清楚小于0的状态
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 如果下一个节点是CANCELLED,则从尾部向头部找距离node最近的非CANCELLED节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // unpark找到的节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    至此acuqire-release的部分就此结束了,至于共享模式的代码大同小异,在后面分析信号量的时候会再提及~


    四、总结

    AQS应该是整个JUC中各个类涉及最多的了,其重要性可想而知,在了解其实现原理后,有助于我们分析其他的代码。最后谢谢各位园友观看,如果有描述不对的地方欢迎指正,与大家共同进步!



    参考:Java并发之AQS详解

  • 相关阅读:
    InApp PurchaseVerifying Store Receipts[6]
    InApp PurchaseTesting a Store [7]
    App Store Review Guidelines
    JAVA基础之一维数组和多维数组
    定位标记
    JSTL常用标签汇总
    struts1.2原理
    struts1.2中的ActionForm
    jdbc连接
    ActionForm与bean的区别
  • 原文地址:https://www.cnblogs.com/joemsu/p/9001136.html
Copyright © 2011-2022 走看看