zoukankan      html  css  js  c++  java
  • AQS源码解析

    文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多。还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油!

    AQS 结构

    属性

    private transient volatile Node head;
    
    private transient volatile Node tail;
    
    // 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
    // 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
    private volatile int state;
    
    // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
    // reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
    // if (currentThread == getExclusiveOwnerThread()) {state++}
    private transient Thread exclusiveOwnerThread;


    内部类

    Node

    static final class Node {
        // 标识节点当前在共享模式下
        static final Node SHARED = new Node();
        // 标识节点当前在独占模式下
        static final Node EXCLUSIVE = null;
    
        // ================================================ 下面的几个int常量是给waitStatus用的 ===============================================
        // 代码此线程取消了争抢这个锁
        static final int CANCELLED =  1;
        // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
        static final int SIGNAL    = -1;
        // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
        static final int CONDITION = -2;
        // 同样的不分析,略过吧
        static final int PROPAGATE = -3;
        // ================================================================================================================================
    
        // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
        // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
        // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的.
        volatile int waitStatus;
        // ================================================================================================================================
        
        // 用于阻塞队列
        volatile Node prev;
        volatile Node next;
    
        // 这个就是线程本尊
        volatile Thread thread;
        
        // 用于条件队列
        Node nextWaiter;
    }

    获取独占锁

    lock () (摘自Reentrantlock)

    public void lock() {        
        sync.lock();     
    }

    acquire (int arg)

    // 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
    // 否则,acquireQueued方法会将线程压到队列中
    public final void acquire(int arg) { // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
        // 因为有可能直接就成功了呢,也就不需要进队列排队了,
        // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
        if (!tryAcquire(arg) &&
            // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            selfInterrupt();
        }
    }        

    tryAcquire (int acquires) (实现来自ReentrantLock)

    // 尝试直接获取锁,返回值是boolean,代表是否获取到锁
    // 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state == 0 此时此刻没有线程持有锁
        if (c == 0) {
            // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
            // 看看有没有别人在队列中等了半天了
            if (!hasQueuedPredecessors() &&
                // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
                // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
                // 因为刚刚还没人的,我判断过了
            compareAndSetState(0, acquires)) {
    
                // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
        // 这里不存在并发问题
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
        // 回到上面一个外层调用方法继续看:
        // if (!tryAcquire(arg) 
        //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        //     selfInterrupt();
        return false;
    }                

    tryAcquire(arg) 如果返回false,那么代码将执行   acquireQueued(addWaiter(Node.EXCLUSIVE), arg)   这个方法,首先需要执行: addWaiter(Node.EXCLUSIVE) 

    addWaiter(Node.EXCLUSIVE) 

    // 此方法的作用是把线程包装成node,同时进入到队列中
    // 参数mode此时是Node.EXCLUSIVE,代表独占模式
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
        Node pred = tail;
    
        // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
        if (pred != null) {
            // 将当前的队尾节点,设置为自己的前驱 
            node.prev = pred;
            // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
            if (compareAndSetTail(pred, node)) {
                // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
                // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了
                pred.next = node;
                // 线程入队了,可以返回了
                return node;
            }
        }
        // 仔细看看上面的代码,如果会到这里,
        // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
        enq(node);
        return node;
    }

    enq (final Node node)

    // 采用自旋的方式入队
    // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
    // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 之前说过,队列为空也会进来这里
            if (t == null) { // 初始化head节点
                // 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
                // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
                if (compareAndSetHead(new Node()))
                    // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
    
                    // 这个时候有了head,但是tail还是null,设置一下,
                    // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
                    // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
                    // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
                    tail = head;
            } else {
                // 下面几行,和上一个方法 addWaiter 是一样的,
                // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    tryAcquire(arg) 如果返回false,那么代码将执行   acquireQueued(addWaiter(Node.EXCLUSIVE), arg)   这个方法

    acquireQueued(final Node node, int arg)

    // 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
    // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
    // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
    // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
                // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
                // 所以当前节点可以去试抢一下锁
                // 这里我们说一下,为什么可以去试试:
                // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
                // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
                // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
                // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
                // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 什么时候 failed 会为 true???
            // tryAcquire() 方法抛异常的情况
            if (failed)
                cancelAcquire(node);
        }
    }

    shouldParkAfterFailedAcquire(Node pred, Node node)

    // 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
    // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
        if (ws == Node.SIGNAL)
            return true;
    
        // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
        // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
        // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
        // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
        // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 仔细想想,如果进入到这个分支意味着什么
            // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
            // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
            // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
            // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 这个方法返回 false,那么会再走一次 for 循序,
        //     然后再次进来此方法,此时会从第一个分支返回 true
        return false;
    }

    parkAndCheckInterrupt

    // 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
    // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

    下面我画了张图帮大家理一下思路,转载注明出处

     附录


    释放独占锁

    unlock () (摘自Reentrantlock)

    public void unlock() {     
        sync.release(1); 
    }

    release (int arg)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    tryRelease (int arg)(实现来自Reentrantlock)

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 是否完全释放锁
        boolean free = false;
        // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    unparkSuccessor (Node node)

    // 唤醒后继节点
    // 从上面调用处知道,参数node是head头结点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果head节点当前waitStatus<0, 将其修改为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
        // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒线程
            LockSupport.unpark(s.thread);
    }

    唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 刚刚线程被挂起在这里了
        // interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
        return Thread.interrupted();
    }

    条件锁

    我们借Reentrantlock来看一下条件锁

    Lock lock = new ReentrantLock();
    Condition notFull = lock.newCondition();
    Condition notEmpty = lock.newCondition();
    final ConditionObject newCondition() {
        // 实例化一个 ConditionObject
        return new ConditionObject();
    }

    ConditionObject是AQS中的一个内部类,类似于之前提到的Node

    ConditionObject 

    public class ConditionObject implements Condition, java.io.Serializable {
            // 条件队列的第一个节点
            private transient Node firstWaiter;
            // 条件队列的最后一个节点
            private transient Node lastWaiter;

     

     回顾一下Node的属性

    // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
    volatile int waitStatus; 
    // 用于阻塞队列
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    // 用于条件队列
    Node nextWaiter;

    先捋一下简单流程

    基本上,把上面那张图看懂,你也就知道 condition 的处理流程了。所以,我先简单解释下这图,然后再具体地解释代码实现。

    1. 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的;
    2. 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;
    3. 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;
    4. 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。

    上面的 2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法等,不过把这里弄懂是这节的主要目的。


    条件锁的await()

    await ()(实现来自AQS中的ConditionObject内部类中)

    // 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
    // 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
    public final void await() throws InterruptedException {
        // 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态,
    // interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
    if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的条件队列中 Node node = addConditionWaiter(); // 释放锁,返回值是释放锁之前的 state 值 // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 这里退出循环有两种情况,之后再仔细分析 // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了 // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒后,将进入阻塞队列,等待获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

    addConditionWaiter() 

    将节点加入到条件队列

    // 将当前线程对应的节点入队,插入队尾
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 如果条件队列的最后一个节点取消了,将其清除出去
        // 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队?
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // node 在初始化的时候,指定 waitStatus 为 Node.CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
    
        // t 此时是 lastWaiter,队尾
        // 如果队列为空
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

    回到 wait 方法,节点入队了以后,会调用  int savedState = fullyRelease(node);  方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。

    考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

    fullyRelease (Node node)

    // 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
    // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
    // 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
    // 相应的,如果 lock 重入了 n 次,savedState == n
    // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

    如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。

    下面我们再回到await() 中,从   int savedState = fullyRelease(node);  后继续:

    // 如果不在阻塞队列中,注意了,是阻塞队列
    while (!isOnSyncQueue(node)) {
        // 线程挂起
        LockSupport.park(this);
    
        // 这里可以先不用看了,等看到它什么时候被 unpark 再说
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    isOnSyncQueue(Node node)

    // 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
    // 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
    // 这个方法就是判断 node 是否已经移动到阻塞队列了
    final boolean isOnSyncQueue(Node node) {
    
        // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
        // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
        // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果 node 已经有后继节点 next 的时候(注意是next,阻塞队列独有的,不是条件队列的),那肯定是在阻塞队列了
        if (node.next != null) 
            return true;
    
        // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列
    
        // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
        // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
        // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。
    
        return findNodeFromTail(node);
    }
    
    // 从阻塞队列的队尾往前遍历,如果找到,返回 true
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

    条件锁的signal

    signal ()(实现来自AQS中的ConditionObject内部类中)

    // 唤醒等待了最久的线程
    // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
    public final void signal() {
        // 调用 signal 方法的线程必须持有当前的独占锁
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

    doSignal(Node first)

    // 从条件队列队头往后遍历,找出第一个需要转移的 node
    // 因为前面我们说过,有些线程会取消排队,但是可能还在队列中
    private void doSignal(Node first) {
        do {
              // 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
            // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
          // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
    }

    transferForSignal(Node node)

    // 将节点从条件队列转移到阻塞队列
    // true 代表成功转移
    // false 代表在 signal 之前,节点已经取消了
    final boolean transferForSignal(Node node) {
    
        // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
        // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
        // 否则,将 waitStatus 置为 0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        // enq(node): 自旋进入阻塞队列的队尾
        // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
        // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节
            LockSupport.unpark(node.thread);
        return true;
    }

    signal 之后,回到刚刚await () 挂起的地方继续

    while (!isOnSyncQueue(node)) {
        // 线程挂起
        LockSupport.park(this);
    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0

    1. REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
    2. THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
    3. 0 :说明在 await 期间,没有发生中断

    有以下几种情况会让 LockSupport.park(this); 这句返回继续往下执行:

    1. 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
    2. 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
    3. signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
    4. 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题

    线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断。

    下面是await () 的中断处理部分(不了解中断的请去复习中断,不然可能会有点懵逼)

    checkInterruptWhileWaiting (node)

    // 1. 如果在 signal 之前已经中断,返回 THROW_IE
    // 2. 如果是 signal 之后中断,返回 REINTERRUPT
    // 3. 没有发生中断,返回 0
    private int checkInterruptWhileWaiting(Node node) {
        // Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true,
        // 同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT) 的使用。
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }

    transferAfterCancelledWait(Node node)

    判断是 signal 调用之前中断的,还是 signal 之后发生的中断。

    // 只有线程处于中断状态,才会调用此方法
    // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
    // 返回 true:如果此线程在 signal 之前被取消,
    final boolean transferAfterCancelledWait(Node node) {
        // 用 CAS 将节点状态设置为 0 
        // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            // 将节点放入阻塞队列
            // 这里我们看到,即使中断了,依然会转移到阻塞队列
            enq(node);
            return true;
        }
    
        // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
        // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
        // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

    从刚刚await () 的 while (!isOnSyncQueue(node))  循环后的地方继续

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    // 处理中断状态
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

    acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。

    reportInterruptAfterWait(int interruptMode)

    处理中断状态

    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

    带超时机制的await ()

    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        // 等待这么多纳秒
        long nanosTimeout = unit.toNanos(time);
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        // 当前时间 + 等待时长 = 过期时间
        final long deadline = System.nanoTime() + nanosTimeout;
        // 用于返回 await 是否超时
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 时间到啦
            if (nanosTimeout <= 0L) {
                // 这里因为要 break 取消等待了。取消等待的话一定要调用 transferAfterCancelledWait(node) 这个方法
                // 如果这个方法返回 true,在这个方法内,将节点转移到阻塞队列成功
                // 返回 false 的话,说明 signal 已经发生,signal 方法将节点转移了。也就是说没有超时嘛
                timedout = transferAfterCancelledWait(node);
                break;
            }
            // spinForTimeoutThreshold 的值是 1000 纳秒,也就是 1 毫秒
            // 也就是说,如果不到 1 毫秒了,那就不要选择 parkNanos 了,自旋的性能反而更好
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            // 得到剩余时间
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }

    超时的思路还是很简单的,不带超时参数的 await 是 park,然后等待别人唤醒。而现在就是调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时,否则就是超时了。超时的话,自己来进行转移到阻塞队列,然后抢锁。


    获取共享锁

    捋一下简单流程

    AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。

    对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。

    countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。

    await ()(摘自CountDownLatch)

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    acquireSharedInterruptibly(int arg)

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        // 这也是老套路了,中断那一节说过了
        if (Thread.interrupted())
            throw new InterruptedException();
    
        // state 为初始化的值。
        // 也就是说,这个 if 返回 true,然后往里看
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    // 只有当 state == 0 的时候,这个方法才会返回 1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

    doAcquireSharedInterruptibly (int arg)

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 1. 入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 同上,只要 state 不等于 0,那么这个方法返回 -1
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 2
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    经过第 1 步 addWaiter 入队以后

     由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,将 head 的 waitStatus 值设置为 -1

     

    countDown ()

    public void countDown() {
        sync.releaseShared(1);
    }

    releaseShared (int arg)

    public final boolean releaseShared(int arg) {
        // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
        // 否则只是简单的 state = state - 1 那么 countDown() 方法就结束了
        // 将 state 减到 0 的那个操作才是最复杂的,继续往下吧
        if (tryReleaseShared(arg)) {
            // 唤醒 await 的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

    doReleaseShared()

    // 调用这个方法的时候,state == 0
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
                if (ws == Node.SIGNAL) {
                    // 将 head 的 waitStatue 设置为 0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
                    continue;                
            }
            if (h == head)                   
                break;
        }
    }

    之后被唤醒的线程会回到await() 的阻塞的地方继续

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); // 2. 这里是下一步
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 1. 唤醒后这个方法返回
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    接下来,线程会进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
    
        // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
        // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
                doReleaseShared();
        }
    }

    就这样,醒来的线程唤醒下一个线程,下一个线程唤醒下下个线程

    如果你能看到这里,那么恭喜你,我要送你一句话:


  • 相关阅读:
    详解Twitter开源分布式自增ID算法snowflake(附演算验证过程)
    分布式自增ID算法-Snowflake详解
    关于分布式唯一ID,snowflake的一些思考及改进(完美解决时钟回拨问题)
    分布式ID增强篇--优化时钟回拨问题
    专题:性能调优之工具---perf
    Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别,消息队列有什么优点和缺点
    Zookeeper框架Curator使用
    Zookeeper 3.5启动时 8080端口被占用
    Qt状态机框架(状态机就开始异步的运行了,也就是说,它成为了我们应用程序事件循环的一部分了)
    Qt插件开发入门(两种方法:High-Level API接口,Low-Level API接口)
  • 原文地址:https://www.cnblogs.com/fatmanhappycode/p/12269340.html
Copyright © 2011-2022 走看看