zoukankan      html  css  js  c++  java
  • Java 并发之AbstractQueuedSynchronizer(AQS)源码解析

    关键字CLH,Node,线程,waitStatus,CAS,中断

    目录

    图解AQS的操作细节
    0、前言
    1、基本概念
    1.1、CAS自旋
    1.2、Node
    1.3、CLH & AQS
    1.4、ReentrantLock
    2、图解AQS
    2.1、线程A单独运行
    2.2、线程B开始运行
    2.3、线程C开始运行
    2.4、线程A停止运行,线程B继续运行
    2.5.1、线程B停止运行,线程C继续运行
    2.5.2、线程C放弃竞争
    3、问题总结
    3.1、为什么在unparkSuccessor操作中从尾节点开始扫描
    3.2、公平锁中的hasQueuedPredecessors操作为什么从tail开始
    3.3、当前运行的线程在哪里
    3.4、节点的waitStatus和AQS的state的区别
    4、参考链接

    CountDownLatch ==> Java 并发之CountDownLatch 计数器 操作图解细节

    0、前言

    AQS 全称 AbstractQueuedSynchronizer,中文简称是同步器,是 jdk1.5开始放在JUC中的,是java中关于线程、同步操作的基础组件,JUC作者Doug Lea AQS简介论文,ReentrantLock,ReentrantReadWriteLock,CountDownLatch等都是基于AQS开发的复合特定场景的锁

    先介绍AQS中的基本概念,然后具体使用3个线程的操作过程一步一步分析AQS中的操作细节,配合着分析AQS源码,了解其中的原理。本文只分析互斥、非公平的方式竞争的情况

    1、基本概念

    1.1、CAS自旋

    CAS 是Compare And Swap的简称,具有单一变量的原子操作特性,对比成功后进行交换操作,他是乐观操作,期间会无限循环操作,直到对比成功,然后进行后续交互操作

    CAS 包含了三个操作数据,内存位置V、预期值A、新预期值B,如果当前内存V存放的数据和A一样,就认为比较成功,然后把当前V所在的位置设置为B

    因为会无限循环操作,所以可能导致CPU效率低下,而且运行中还会导致ABA问题,也就是A->B->A的问题,误以为此时数据未发生变化,其实中间已经发生变化。该问题在java中提供了类AtomicStampedReference解决该问题,先会查看当前引用值是否符合期望,ABA也会变成1A->2B->3A,这样很清楚的感知到发生变化了。

    1.2、Node

    Node 是AQS中的双向链表的节点信息,主要信息如下

    // 共享模式
    static final Node SHARED = new Node();
    // 互斥模式(独斥)
    static final Node EXCLUSIVE = null;
    
    // 当前线程取消竞争
    static final int CANCELLED =  1;
        
    // 后置节点等待被唤醒
    static final int SIGNAL    = -1;
        
    // 暂时不明含义
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
      
    // 和共享有关  
    static final int PROPAGATE = -3;
        
    // 最重要的信息,就是为上面的1,-1,-2,-3,以及0
    volatile int waitStatus;
    // 前置节点
    volatile Node prev;
    // 后置节点
    volatile Node next;
    // 节点存储的线程信息,会被挂起和唤醒
    volatile Thread thread;
    
    // 和condition 有关
    Node nextWaiter;
    

    1.3、CLH & AQS

    在AQS中所有没有抢占到锁的线程最后都会被存储到CLH双向链表中,然后挂起等待被唤醒

    // CLH队列的头结点,其不包含线程信息,head永远为null
    private transient volatile Node head;
    // CLH队列的尾节点,每次新加一个节点都会尾插到最后
    private transient volatile Node tail;
    // 当前锁被占据的次数,因为可以被一个线程重复占据,所以其值可以大于0
    // 没有线程占据,其值就是0
    private volatile int state;
    // 当前运行的线程,也是占据锁的线程,注意和CLH中的线程无关
    private transient Thread exclusiveOwnerThread; 
    

    1.4、ReentrantLock

    基于AQS开发的可重入、互斥锁,并且可以自行响应线程中断,而基于底层操作系统实现线程安全的synchronized关键字却不没有该功能

    Lock lock = new ReentrantLock();
    // 可以通过传入的boolean值设置其为公平方式还是不公平方式占据锁
    lock.lock();
    try { 
      // 业务代码,线程互斥
    }
    finally {
      lock.unlock(); 
    }
    

    2、图解AQS

    现在待运行的三个线程ABC,将会按照如下情况依次执行,三个线程故意设置耗时很长从而出现互斥的情况,必然出现锁竞争,自旋的情况,根据其运行情况,分析AQS的运行过程和原理,

     
    image

    2.1、线程A单独运行

    不存在任何存在的竞争问题,A线程获取锁成功,A线程进入到了运行中的情况,此时的AQS执行时,tryAcquire直接成功,看看非公平方式的tryAcquire操作源码

    ReentrantLock 类

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获取当前线程,此处肯定是A线程了
        int c = getState();
        if (c == 0) {
            // 第一次运行,state=0
            if (compareAndSetState(0, acquires)) {
                // 1、利用CAS自旋方式,判断当前state确实为0,然后设置成acquire(1)
                // 这是原子性的操作,可以保证线程安全
                setExclusiveOwnerThread(current);
                // 设置当前执行的线程,直接返回为true
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
           // 2、当前的线程和执行中的线程是同一个,也就意味着可重入操作
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            // 表示当前锁被1个线程重复获取了nextc次
            return true;
        }
        // 否则就是返回false,表示没有尝试成功获取当前锁
        return false;
    }
    

    此时A线程直接在1处被允许,然后设置了当前锁被占据的线程,直接返回,A线程正常运行

    2.2、线程B开始运行

     
    image

    现在A线程一直在运行中,B线程开始运行,同样的B线程也要开始尝试获取当前锁,经过tryAcquire操作肯定返回false,获取当前锁失败,原因可同理分析上面1.1的代码块,现在的情形是

    A线程:继续运行
    B线程:获取锁失败,计划进入到CLH队列中

    这时候B线程执行的线路是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),先添加一个互斥节点,然后唤醒可以执行的节点线程,这个时候的CLH队列是初始值状态,都指向null

     
    image

    先看看添加互斥节点的代码

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 新建一个节点,包装的线程是当前线程,传入的mode是互斥的
        // 当前线程是B线程
        Node pred = tail;
        // 此时尾节点tail=null
        if (pred != null) {
            // 当尾节点不为null,也就是CLH存在节点数据时
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // 又是CAS操作,判断当前pred节点是否是尾节点,如果是,设置node为尾节点,否则跳过
                pred.next = node;
                // 双向链表的基本操作,把当前节点当做尾部节点插入
                return node;
            }
        }
        // 其实上面这一段代码的重点是**快速**,假定当前不存在互斥或者冲突,能够高效的添加节点成功
        
        enq(node);
        // 这个时候B线程创建好一个节点,添加到尾部
        return node;
        // 然后返回新创建的节点
    }
    
    private Node enq(final Node node) {
        for (;;) {
            // 死循环,可用严格确保可用完整插入到队列尾部
            Node t = tail;
            if (t == null) { 
                // 1、如果当前尾部是null,这个地方就是初始化的CLH队列的地方
                // B线程肯定会执行到这个地方
                if (compareAndSetHead(new Node()))
                    // CAS自旋方式,设置当前队列头部节点为一个new Node(不包含线程信息)
                    tail = head;
                    // 头部和尾部指向同一个新建的new Node 节点
                    // 注意此时没有退出。。。。只是完成if操作,会继续for循环操作
            } else {
                // 2、第二次循环到这里,CLH队列只有一个节点new Node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    // 利用CAS的方式,配合双写链表的连接方式,添加一个节点到尾部
                    t.next = node;
                    // 然后返回原尾部节点
                    return t;
                }
            }
        }
    }
    

    经历过上面的代码块的1、2两个步骤时的CLH队列情况是

    步骤1步骤2
     
    image
     
    image

    需要注意到head指向的节点线程为null!

    再来到acquireQueued操作,去唤醒可能执行的节点线程

    final boolean acquireQueued(final Node node, int arg) {
        // 这个时候node节点是上图中的tail节点,也就是线程B所在的节点,arg=1
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 查看当前node节点的前置节点p
                if (p == head && tryAcquire(arg)) {
                   // 如果前置节点是头部节点,B线程进行尝试获取锁
                   // 如果!是如果!
                   // 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
                    setHead(node);
                    // 把node节点的线程设置为null,然后head指到node上
                    p.next = null; // help GC
                    // 回收操作
                    failed = false;
                    // 返回,然后将执行的将会是B线程
                    return interrupted;
                }
                // 按照我们的线程ABC的样例,上面的if肯定不会执行
                // 1、因为即使node的前节点是head,但是也无法获取到锁,因为一直被A线程占有者
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果shouldParkAfterFailedAcquire返回true,则挂起,否则不挂起线程
                    interrupted = true;
                // 2、第一次循环设置了p节点状态位-1,然后B线程循环至这里被挂起
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 返回true,挂起,返回false则不挂起
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        
        if (ws == Node.SIGNAL)
            // 前节点的状态=-1,返回true,直接挂起即可
            return true;
        if (ws > 0) {
            // 前置节点状态有>0,说明前驱节点取消了排队,移除掉该节点
            do {
                node.prev = pred = pred.prev;
                // 拆分为2步
                // pred = pred.prev
                // node.prev = pred
            } while (pred.waitStatus > 0);
            // 当前节点开始往前扫描,从双向队列中除去>0的节点,直到<=0
            pred.next = node;
        } else {
            // 利用CAS设置前节点状态位-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    // 总结一下该方法用途:当前节点的前驱节点状态位-1时,挂起当前线程
    // 返回false时,也就是不需要挂起,返回true,则需要调用parkAndCheckInterrupt挂起当前线程
    
     
    image

    运行到现在,B线程挂起,A线程继续在运行

    2.3、线程C开始运行

    线程C开始运行,如下图

     
    image

    现在的线程情况是

    线程A:运行中
    线程B:挂起,暂停运行
    线程C:开始运行

    很明显,C线程会在tryacquire(1)方法中失败,然后同样通过addWaiter方法添加一个节点放在尾部,使得经过addWaiter方法之后的队列成为

     
    image

    又需要经过acquireQueued方法的处理,但是此时此时的节点是线程C,其前置节点是线程B,不是head,直接运行到shouldParkAfterFailedAcquire方法中

    此时B线程所在的节点waitStatus=0,只能通过CAS方法设置B节点waitStatus为SIGHNAL(-1),,并且之后又经过1次循环操作,最后返回true,此时队列为

     
    image

    然后同理经过parkAndCheckInterrupt,线程C也被中断了

    现在的情况是

    线程A:运行中
    线程B:挂起,暂停运行
    线程C:挂起,暂停运行

    2.4、线程A停止运行,线程B继续运行

     
    image

    现在A运行完成了,需要释放占据的锁

    ReentrantLock 类

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            // 当前运行的线程和锁住的线程不是同一个,抛出监控状态异常错误
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            // 如果为0,则意味着这时可以释放锁了
            free = true;
            setExclusiveOwnerThread(null);
            // 设置当前的占据锁的线程为null,给其他线程一个机会
        }
        setState(c);
        // 存在重入的机会,所以不一定就是释放该锁
        return free;
    }
    

    AbstractQueuedSynchronizer 类

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 返回true,则表示可以唤醒其他挂起的线程,通知他们可以干活了
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 如果头部节点有效,则开始唤醒CLH队列的节点线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    private void unparkSuccessor(Node node) {
        // node节点是head,在我们上面已经说明了head节点的waitStatus=-1
       
        int ws = node.waitStatus;
        if (ws < 0)
            // 1、设置head节点状态为0
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
        // 从head下一个节点开始处理
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 如果下一个节点为null,或者下一个节点状态>0(为废弃节点)
            // 从尾节点开始扫描,直到扫描到距离head最近的一个<=0的节点信息
            // 在我们当前的例子中,肯定就是线程B
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 样例中,s节点就是B线程所在的节点
            // 2、唤醒s.thread ,会触发acquireQueued的循环继续执行
            LockSupport.unpark(s.thread);
    }
    

    经过上面步骤1之后,head节点就被设置为了0

     
    image

    线程B经过LockSupport.unpark操作之后,原本挂起在acquireQueued的循环中,会继续执行

    for (;;) {
                final Node p = node.predecessor();
                // 查看当前node节点的前置节点p
                if (p == head && tryAcquire(arg)) {
                   // 如果前置节点是头部节点,B线程进行尝试获取锁
                   // 如果!是如果!
                   // 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
                    setHead(node);
                    // 把node节点的线程设置为null,然后head指到node上
                    p.next = null; // help GC
                    // 回收操作
                    failed = false;
                    // 返回,然后将执行的将会是B线程
                    return interrupted;
                }
    

    恰好当前B线程的前一个节点p是head,而且线程A也已经释放了,没有占据锁,所以在tryAcquire操作时,可以顺利获取到锁,然后把B线程节点当做head,并且把node节点(原head节点)前置和后置都设置为null

     
    image

    其中图中的双向链表的1、2、3依次断开,原head节点被GC回收,新的CLH队列为

     
    image

    现在的情况是

    线程A:运行结束
    线程B:运行中
    线程C:挂起,暂停运行

    2.5.1、线程B停止运行,线程C继续运行

     
    image

    现在的情况是

    线程A:运行结束
    线程B:运行结束
    线程C:运行中

    按照1.4小节的套路,最后来到了unparkSuccessor 方法中

    设置当前头部节点head的waitStatus为0,然后唤醒线程C,把head节点数据全部清除掉,head和tail将会指向同一个节点,如下图,最后线程C在运行,CLH队列只剩下一个节点,head和tail同时指向该节点。

     
    image

    直到线程C运行完,CLH队列依旧如此,相当于完成了CLH队列的初始化,当下一次又来互斥线程时,又会开始创建节点

    2.5.2、线程C放弃竞争

    现在模拟的是B线程继续运行,而在CLH中的线程C放弃了竞争这种场景

     
    image

    在上述的操作中是无法直接放弃竞争的,接下来看看是为什么,直接来到线程挂起和唤醒的代码快

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
        // 利用另外的线程对当前线程触发中断请求,会直接返回当前中断状态,也就是true
        // 那么在acquireQueued 函数中 interrupted 的值被赋值为true
        // 其他有任何的改变...
    }
    

    只是用来标记曾经被中断过,但是未有任何实际上的放弃竞争的操作
    不过上面的finally代码块中有采用cancelAcquire 方法,不过一般情况下这个并不会被执行,而是一种兜底方案
    ,除非出现了一些意外情况

    那么我们该如何确保真的进行放弃竞争操作呢,不采用lock.lock方法,而是lock.lockInterruptibly()

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            // 有线程中断,直接抛出中断异常
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        // 尾插节点成功
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 重点在这里,一旦出现了异常,则立马抛出该异常情况
                    // 此时failed 字段肯定是true了
                    // 需要进入到下面的cancelAcquire 方法中去取消竞争了
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 开始取消竞争
                cancelAcquire(node);
        }
    }
    

    在上面的学习知道,设置节点状态为Node.CANCELLED,然后在后续的队列清理中会清除该节点的,那现在看看是如何主动取消竞争的

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
    
        node.thread = null;
        // 节点线程数据设置为null
    
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 从当前节点的前置节点开始扫描去掉废弃的节点,这样操作不会出现并发问题
    
        Node predNext = pred.next;
    
        // 设置为取消状态!
        node.waitStatus = Node.CANCELLED;
    
        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            // node是尾部节点,而且通过CAS自旋成功,设置pred为tail
            // pred的next为nul,可以彻底和node 节点切割,node节点等待GC回收
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                // 前节点不是头部节点,而且包含的线程不为null(上面就有操作设置线程为null的情况)
                // 或者节点状态可以设置为-1
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // 这样说么下个节点是可以串在一起的!
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
                    // CAS 操作,设置后置节点
                 // 这个操作就相当去双写链表去掉node节点   
            } else {
                unparkSuccessor(node);
                // 唤醒node的后置节点
                // 注意!此时node节点还在链表中
                // 他的真正移除是依靠node后置节点的shouldParkAfterFailedAcquire 去移除节点操作完成的
            }
            node.next = node; // help GC
        }
    }
    

    总结一下

    • 如果尾部节点或者中间节点就依靠从链表中脱离操作实现放弃竞争
    • 如果是头部的下一个节点,是依靠唤醒下一个节点后进行的链表清楚操作完成

    这样就很清楚线程是如何取消竞争操作的

    3、问题总结

    上面的case只是各种线程执行方式的一种,实际场景中包含了各种各样的运行方式,每个线程执行的时间也长短不一,现在讨论几个常见的问题

    3.1、为什么在unparkSuccessor操作中从尾节点开始扫描

    这个问题需要回头观察CLH队列是如何创建的,来到addWaiter方法

    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // 通过CAS 操作后node节点为tail
            // 并且早就设置了node.prev = pred
            // 1、也就是前置的链表已经绑定完毕,后置的链表还未绑定
            pred.next = node;
            return node;
        }
    }
    

    在代码块1处执行完成之后,还未执行绑定后置链表操作,如果出现唤醒操作,从头部开始遍历存在一定几率使得无法获取到当前最新的节点(因为后置链表还未绑定,无法通过.next获取到下一个节点信息),而从尾部开始扫描则不会导致该问题

    很巧妙的利用尾部扫描避免了可能的并发问题

    3.2、公平锁中的hasQueuedPredecessors操作为什么从tail开始

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        // 先读取tail节点,再读取head节点
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    在思考这个问题之前,需要时刻认为CLH队列一直都在动态的变化中,head和tail随时会发生变化
    在构建CLH链表时,是先设置head,然后设置tail,那么如果有null的情况只有head!=null && tail==null
    一旦在这个时刻添加尾节点成功,head.next 的值其实已经变化了,但是如果先获取head可能存在null的情况,所以先获取tail再获取head

    3.3、当前运行的线程在哪里

    不存放在CLH队列中,样例图中也并未展示出来,其一直处于运行中,主要CLH队列的head节点线程为null

    3.4、节点的waitStatus和AQS的state的区别

    两者表示的含义完全不一样,节点的waitStatus是表示节点状态,简洁的表达线程的状态,当其值大于0就认为该节点线程放弃竞争锁

    AQS中的state表示该锁被占据的次数,某一时刻只能被一个线程一次或者多次占据,其他线程只能被挂起等待

    4、参考链接


    作者:jwfy
    链接:https://www.jianshu.com/p/282bdb57e343

  • 相关阅读:
    双系统下,Windows如何正确删除Linux系统
    关于通过adb启动Activity、activity、service以及发送broadcast的命令
    Eclipse常用快捷键集合
    关于“学习Linux用什么系统”的解答
    关于设置android:imeOptions属性无效的解决办法
    Android XML文件布局各个属性详解
    Android开发:文本控件详解——EditText(一)基本属性
    Android开发:UI相关(一)自定义样式资源
    Android开发:文本控件详解——TextView(一)基本属性
    Android开发:Android Studio开发环境配置
  • 原文地址:https://www.cnblogs.com/developing/p/12130866.html
Copyright © 2011-2022 走看看