zoukankan      html  css  js  c++  java
  • JUC锁框架源码阅读-AbstractQueuedSynchronizer

    前言

    最近在读canal-adpter源码的时候,源码里通过AQS+zookeeper实现了分布式锁。尝试看了一下,看不懂通过搜索锁类继承的父类才发现是AQS所以过来阅读源码。

    阅读方式:直接看对应方法 点击<>可以跟代码

    什么是AQS

    AQS是JUC锁框架中最重要的类,通过它来实现独占锁和共享锁的。比如ReentrantLock、countDownLatch、ReentrantReadWriteLock 都是依靠AQS实现

    什么是CAS 

    参考https://www.cnblogs.com/LQBlog/p/11607351.html#autoid-1-0-0

    CLH队列(线程同步队列)

    在没有获取到锁的情况下是线程需要阻塞的,需要阻塞的线程是通过CLH队列来存储,CLH并不是直接存储线程,而是通过内部类Node来存储的

    内部类Node

    static final class Node {
            // 共享模式的标记
            static final Node SHARED = new Node();
            // 独占模式的标记
            static final Node EXCLUSIVE = null;
    
            // waitStatus变量的值,标志着线程被取消
            static final int CANCELLED =  1;
            // waitStatus变量的值,标志着后继线程(即队列中此节点之后的节点)需要被阻塞.(用于独占锁)
            static final int SIGNAL    = -1;
            // waitStatus变量的值,标志着线程在Condition条件上等待阻塞.(用于Condition的await等待)
            static final int CONDITION = -2;
            // waitStatus变量的值,标志着下一个acquireShared方法线程应该被允许。(用于共享锁)
            static final int PROPAGATE = -3;
    
            // 标记着当前节点的状态,默认状态是0, 小于0的状态都是有特殊作用,大于0的状态表示已取消 通过volatile保证了线程的可见性
            volatile int waitStatus;
    
            // prev和next实现一个双向链表
            volatile Node prev;
            volatile Node next;
    
            // 该节点拥有的线程
            volatile Thread thread;
    
            // 可能有两种作用:1. 表示下一个在Condition条件上等待的节点
            // 2. 表示是共享模式或者独占模式,注意第一种情况节点一定是共享模式
            Node nextWaiter;
    
            // 是不是共享模式
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            // 返回前一个节点prev,如果为null,则抛出NullPointerException异常
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            // 用于创建链表头head,或者共享模式SHARED
            Node() {
            }
    
            // 使用在addWaiter方法中
            Node(Thread thread, Node mode) {
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            // 使用在Condition条件中
            Node(Thread thread, int waitStatus) {
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    View Code

    <7>predecessor

     // 返回前一个节点prev,如果为null,则抛出NullPointerException异常
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }

    操作CLH队列的相关方法

    package com.lq.aqs;
    
    /**
     * @Project redis-in-action
     * @PackageName com.lq.aqs
     * @ClassName AbstractQueuedSynchronizer
     * @Author qiang.li
     * @Date 2021/8/26 4:38 下午
     * @Description TODO
     */
    public class AbstractQueuedSynchronizer {
        // CLH队列头
        private transient volatile Node head;
    
        // CLH队列尾
        private transient volatile Node tail;
        /**
         * 通过CAS函数设置head值,仅仅在enq方法中调用
         */
        private final boolean compareAndSetHead(Node update) {
            return unsafe.compareAndSwapObject(this, headOffset, null, update);
        }
        // 重新设置队列头head,它只在acquire系列的方法中调用
        private void setHead(Node node) {
            head = node;
            // 线程也没有意义了,因为该线程已经获取到锁了
            node.thread = null;
            // 前一个节点已经没有意义了
            node.prev = null;
        }
        /**
         * 通过CAS函数设置tail值,仅仅在enq方法中调用
         */
        private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }
        // 向队列尾插入新节点,如果队列没有初始化,就先初始化。返回原先的队列尾节点
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                // t为null,表示队列为空,先初始化队列
                if (t == null) {
                    // 采用CAS函数即原子操作方式,设置队列头head值。
                    // 如果成功,再将head值赋值给链表尾tail。如果失败,表示head值已经被其他线程,那么就进入循环下一次
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 新添加的node节点的前一个节点prev指向原来的队列尾tail
                    node.prev = t;
                    // 采用CAS函数即原子操作方式,设置新队列尾tail值。
                    if (compareAndSetTail(t, node)) {
                        // 设置老的队列尾tail的下一个节点next指向新添加的节点node
                        t.next = node;
                        return t;
                    }
                }
            }
        }
        // 通过给定的模式mode(独占或者共享)为当前线程创建新节点,并插入队列中并返回新创建的节点。
        private Node addWaiter(Node mode) {
            // 为当前线程创建新的节点
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            // 如果队列已经创建,就将新节点插入队列尾。
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 如果队列没有创建,通过enq方法创建队列,并插入新的节点。
            enq(node);
            return node;
        }
        /**
         * 根据前一个节点pred的状态,来判断当前线程是否应该被阻塞
         * @param pred : node节点的前一个节点
         * @param node
         * @return 返回true 表示当前线程应该被阻塞,之后应该会调用parkAndCheckInterrupt方法来阻塞当前线程
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                // 如果前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞
                return true;
            if (ws > 0) {
                // 如果前一个节点状态是Node.CANCELLED(大于0就是CANCELLED),
                // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。
                // 所以从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev,
                // 表示node节点的前一个节点已经改变。
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 此时前一个节点pred的状态只能是0或者PROPAGATE,不可能是CONDITION状态
                // CONDITION(这个是特殊状态,只在condition列表中节点中存在,CLH队列中不存在这个状态的节点)
                // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        /**
         * 想要获取锁的 acquire系列方法,都会这个方法来获取锁
         * 循环通过tryAcquire方法不断去获取锁,如果没有获取成功,
         * 就有可能调用parkAndCheckInterrupt方法,让当前线程阻塞
         * @param node 想要获取锁的节点
         * @param arg
         * @return 返回true,表示在线程等待的过程中,线程被中断了
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 表示线程在等待过程中,是否被中断了
                boolean interrupted = false;
                // 通过死循环,直到node节点的线程获取到锁,才返回
                for (;;) {
                    // 获取node的前一个节点
                    final Node p = node.predecessor();
                    // 如果前一个节点是队列头head,并且尝试获取锁成功
                    // 那么当前线程就不需要阻塞等待,继续执行
                    if (p == head && tryAcquire(arg)) {
                        // 将节点node设置为新的队列头
                        setHead(node);
                        // help GC
                        p.next = null;
                        // 不需要调用cancelAcquire方法
                        failed = false;
                        return interrupted;
                    }
                    // 当前一个节点的状态是Node.SIGNAL时,就会调用parkAndCheckInterrupt方法,阻塞node线程
                    // node线程被阻塞,有两种方式唤醒,
                    // 1.是在unparkSuccessor(Node node)方法,会唤醒被阻塞的node线程,返回false
                    // 2.node线程被调用了interrupt方法,线程被唤醒,返回true
                    // 在这里只是简单地将interrupted = true,没有跳出for的死循环,继续尝试获取锁
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // failed为true,表示发生异常,非正常退出
                // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了。
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        /**
         * 阻塞当前线程,线程被唤醒后返回当前线程中断状态
         */
        private final boolean parkAndCheckInterrupt() {
            // 通过LockSupport.park方法,阻塞当前线程
            LockSupport.park(this);
            // 当前线程被唤醒后,返回当前线程中断状态
            return Thread.interrupted();
        }
    
        /**
         * 当前线程发出中断通知
         */
        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    }
    View Code

    <1>addWaiter

       // 通过给定的模式mode(独占或者共享)为当前线程创建新节点,并插入队列中并返回新创建的节点。
        private Node addWaiter(Node mode) {
            // 为当前线程创建新的节点
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            // 如果队列已经创建,就尝试将节点插入队列尾。
            if (pred != null) {
                node.prev = pred;
                //<9>因为存在并发所以这里是通过CAS 这里有可能重试 如果失败则交给下面10处理
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // <10>针对队列没创建 或者向队列尾插入新的节点存在并发的情况下交给enq设置
            enq(node);
            return node;
        }

    <2>acquireQueued

      /**
         * 想要获取锁的 acquire系列方法,都会这个方法来获取锁
         * 循环通过tryAcquire方法不断去获取锁,如果没有获取成功,
         * 就有可能调用parkAndCheckInterrupt方法,让当前线程阻塞
         * @param node 想要获取锁的节点
         * @param arg
         * @return 返回true,表示在线程等待的过程中,线程被中断了
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 表示线程在等待过程中,是否被中断了
                boolean interrupted = false;
                // 通过自旋,直到node节点的线程获取到锁,才返回
                for (;;) {
                    // <7>获取node的前一个节点
                    final Node p = node.predecessor();
                    // 如果前一个节点是队列头head,那么调用子类实现实现tryAcquire尝试获取一次锁
                    // 如果获取成功那么当前线程就不需要阻塞等待,继续执行
                    if (p == head && tryAcquire(arg)) {
                        //<4>将节点node设置为新的队列头
                        setHead(node);
                        // help GC
                        p.next = null;
                        // 不需要调用cancelAcquire方法
                        failed = false;
                        return interrupted;
                    }
                    /**
                     *<5>shouldParkAfterFailedAcquire
                     *     1.如果当前节点的前一个节点状态是Node.SIGNAL 就会直接返回true走parkAndCheckInterrupt阻塞当前线程
                     *     2.如果当前节点的前一个节点大于0是大于0就是CANCEL则重新整理当前队列直到前一个节点不是CANCEL为止
                     *     3.如果前一个节点状态不是Node.SIGNAL同时又不大于0则将前一个节点设置为SINGLE 下次重试就会直接返回true 调用parkAndCheckInterrupt 阻塞当前线程
                     * <6>parkAndCheckInterrupt
                     *     1.阻塞当前线程
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // <11>failed为true,表示发生异常,非正常退出
                // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了。
                if (failed)
                    cancelAcquire(node);
            }
        }

    <3>selfInterrupt

     /**
         * 当前线程发出中断通知
         */
        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }

    <4>setHead

     // 重新设置队列头head,它只在acquire系列的方法中调用
        private void setHead(Node node) {
            head = node;
            // 线程也没有意义了,因为该线程已经获取到锁了
            node.thread = null;
            // 前一个节点已经没有意义了
            node.prev = null;
        }

    <5>shouldParkAfterFailedAcquire

     /**
         * 根据前一个节点pred的状态,来判断当前线程是否应该被阻塞
         * @param pred : node节点的前一个节点
         * @param node
         * @return 返回true 表示当前线程应该被阻塞,之后应该会调用parkAndCheckInterrupt方法来阻塞当前线程
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                // 如果前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞
                return true;
            if (ws > 0) {
                // 如果前一个节点状态是Node.CANCELLED(大于0就是CANCELLED),
                // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。
                // 所以从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev,
                // 表示node节点的前一个节点已经改变。
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 此时前一个节点pred的状态只能是0或者PROPAGATE,不可能是CONDITION状态
                // CONDITION(这个是特殊状态,只在condition列表中节点中存在,CLH队列中不存在这个状态的节点)
                // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    <6>parkAndCheckInterrupt

     /**
         * 阻塞当前线程,线程被唤醒后返回当前线程中断状态
         */
        private final boolean parkAndCheckInterrupt() {
            // 通过LockSupport.park方法,阻塞当前线程
            LockSupport.park(this);
            // 当前线程被唤醒后,返回当前线程中断状态
            return Thread.interrupted();
        }

    <8>compareAndSetHead

      /**
         * 通过CAS函数设置head值,仅仅在enq方法中调用
         */
        private final boolean compareAndSetHead(Node update) {
            return unsafe.compareAndSwapObject(this, headOffset, null, update);
        }

    <9>compareAndSetTail

    /**
         * 通过CAS函数设置tail值,仅仅在enq方法中调用
         */
        private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }

    <10>enq

    // 向队列尾插入新节点,如果队列没有初始化,就先初始化。返回原先的队列尾节点
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                // t为null,表示队列为空,先初始化队列
                if (t == null) {
                    // 采用CAS函数即原子操作方式,设置队列头head值。
                    // <>如果成功,再将head值赋值给链表尾tail。如果失败,表示head值已经被其他线程,那么就进入循环下一次
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 新添加的node节点的前一个节点prev指向原来的队列尾tail
                    node.prev = t;
                    // 采用CAS函数即原子操作方式,设置新队列尾tail值。
                    if (compareAndSetTail(t, node)) {
                        // 设置老的队列尾tail的下一个节点next指向新添加的节点node
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    <11>cancelAcquire

        // 将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了。
        private void cancelAcquire(Node node) {
            // 如果node为null,就直接返回
            if (node == null)
                return;
    
            //
            node.thread = null;
    
            // 跳过那些已取消的节点,在队列中找到在node节点前面的第一次状态不是已取消的节点
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // 记录pred原来的下一个节点,用于CAS函数更新时使用
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            // 将node节点状态设置为已取消Node.CANCELLED;
            node.waitStatus = Node.CANCELLED;
    
            // 如果node节点是队列尾节点,那么就将pred节点设置为新的队列尾节点
            if (node == tail && compareAndSetTail(node, pred)) {
                // 并且设置pred节点的下一个节点next为null
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
    //<12> unparkSuccessor(node); } node.next
    = node; // help GC } }

    <12>unparkSuccessor

    // 唤醒node节点的下一个非取消状态的节点所在线程(即waitStatus<=0)
        private void unparkSuccessor(Node node) {
            // 获取node节点的状态
            int ws = node.waitStatus;
            // 如果小于0,就将状态重新设置为0,表示这个node节点已经完成了
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            // 下一个节点
            Node s = node.next;
            // 如果下一个节点为null,或者状态是已取消,那么就要寻找下一个非取消状态的节点
            if (s == null || s.waitStatus > 0) {
                // 先将s设置为null,s不是非取消状态的节点
                s = null;
                // 从队列尾向前遍历,直到遍历到node节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    // 因为是从后向前遍历,所以不断覆盖找到的值,这样才能得到node节点后下一个非取消状态的节点
                    if (t.waitStatus <= 0)
                        s = t;
            }
            // 如果s不为null,表示存在非取消状态的节点。那么调用LockSupport.unpark方法,唤醒这个节点的线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    <13>doAcquireShared

     /**
         * 获取共享锁,获取失败,则会阻塞当前线程,直到获取共享锁返回
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            // <1>为当前线程创建共享锁节点node
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //<7>获取节点的前一个节点
                    final Node p = node.predecessor();
                    // 如果节点node前一个节点是同步队列头节点。就会调用tryAcquireShared方法尝试获取共享锁 因为前一个节点可能正好释放
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        // 如果返回值大于0,表示获取共享锁成功
                        if (r >= 0) {
    //<14> setHeadAndPropagate(node, r); p.next
    = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } //<5> 如果前一个节点p的状态是Node.SIGNAL,就是调用parkAndCheckInterrupt方法阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // failed为true,表示发生异常, // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了 if (failed) cancelAcquire(node); } }

    <14>setHeadAndPropagate

     // 重新设置CLH队列头,如果CLH队列头的下一个节点为null或者共享模式,
        // 那么就要唤醒共享锁上等待的线程
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head;
            // <4>设置新的同步队列头head
            setHead(node);
            /**
             * 如果自己获取成功因为是共享锁,则循环将后续阻塞的队列 尝试唤醒并获取锁
             * propagate > 0 表示还有可获取数量
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                // 获取新的CLH队列头的下一个节点s
                Node s = node.next;
                // <15>如果节点s是空或者共享模式节点,那么就要唤醒共享锁上等待的线程 让他们尝试获取锁
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }

    <15>doReleaseShared

     // 会唤醒等待共享锁的线程
        private void doReleaseShared() {
            for (;;) {
                // 将同步队列头赋值给节点h
                Node h = head;
                // 如果节点h不为null,且不等于同步队列尾
                if (h != null && h != tail) {
                    // 得到节点h的状态
                    int ws = h.waitStatus;
                    // 如果状态是Node.SIGNAL,就要唤醒节点h后继节点的线程
                    if (ws == Node.SIGNAL) {
                        // 将节点h的状态设置成0,如果设置失败,就继续循环,再试一次。
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        // <12>唤醒节点h后继节点的线程
                        unparkSuccessor(h);
                    }
                    // 如果节点h的状态是0,就设置ws的状态是PROPAGATE。
                    else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // 如果同步队列头head节点发生改变,继续循环,
                // 如果没有改变,就跳出循环
                if (h == head)
                    break;
            }
        }

    <16>fullyRelease

     /**
         * 释放当前线程占有的锁,并唤醒CLH队列一个等待线程
         * 如果失败就抛出异常,设置node节点的状态是Node.CANCELLED
         * @return
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
               // 获取锁的记录状态
                int savedState = getState();
                // 释放当前线程占有的锁
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

    <17>addConditionWaiter

     private Node addConditionWaiter() {
            //获得尾节点
            Node t = lastWaiter;
            // 如果Condition队列尾节点的状态不是Node.CONDITION
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 清除Condition队列中,状态不是Node.CONDITION的节点,
                // 并且可能会重新设置firstWaiter和lastWaiter 相当于清除队列
                //<22>相当于重新整理队列
                unlinkCancelledWaiters();
                // 重新将Condition队列尾赋值给t
                t = lastWaiter;
            }
            // 为当前线程创建一个状态为Node.CONDITION的节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            // 如果t为null,表示Condition队列为空,将node节点赋值给链表头
            if (t == null)
                firstWaiter = node;
            else
                // 将新节点node插入到Condition队列尾
                t.nextWaiter = node;
            // 将新节点node设置为新的Condition队列尾
            lastWaiter = node;
            return node;
        }

    <18>fullyRelease

     /**
         * 释放当前线程占有的锁,并唤醒CLH队列一个等待线程
         * 如果失败就抛出异常,设置node节点的状态是Node.CANCELLED
         * @return
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
               // 获取锁的记录状态
                int savedState = getState();
                // <点击跳转到方法处>释放当前线程占有的锁
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

    <19>isOnSyncQueue

     // 节点node是不是在CLH队列中
        final boolean isOnSyncQueue(Node node) {
            // 如果node的状态是Node.CONDITION,或者node没有前一个节点prev,
            // 那么返回false,节点node不在同步队列中
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            // 如果node有下一个节点next,那么它一定在同步队列中
            if (node.next != null) // If has successor, it must be on queue
                return true;
            // <20>上面都不满足遍历CLH队列 尝试同步队列中查找节点node 判断是否在CLH队列中
            return findNodeFromTail(node);
        }

    <20>findNodeFromTail

      // 在同步队列中从后向前查找节点node,如果找到返回true,否则返回false
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }

    <21>checkInterruptWhileWaiting

     /**
             * 如果线程没有发起了中断请求,返回0.
             * 如果线程发起了中断请求,且中断请求在signalled(即调用signal或signalAll)之前返回THROW_IE
             * 中断请求在signalled之后返回REINTERRUPT
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }

    <22>unlinkCancelledWaiters

    private void unlinkCancelledWaiters() {
                // condition队列头赋值给t
                Node t = firstWaiter;
                // 这个trail节点,只是起辅助作用
                Node trail = null;
                while (t != null) {
                    //得到下一个节点next。当节点是condition时候,nextWaiter表示condition队列的下一个节点
                    Node next = t.nextWaiter;
                    // 如果节点t的状态不是CONDITION,那么该节点就要从condition队列中移除
                    if (t.waitStatus != Node.CONDITION) {
                        // 将节点t的nextWaiter设置为null
                        t.nextWaiter = null;
                        // 如果trail为null,表示原先的condition队列头节点实效,需要设置新的condition队列头
                        if (trail == null)
                            firstWaiter = next;
                        else
                            // 将节点t从condition队列中移除,因为改变了引用的指向,从condition队列中已经找不到节点t了
                            trail.nextWaiter = next;
                        // 如果next为null,表示原先的condition队列尾节点也实效,重新设置队列尾节点
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        // 遍历到的有效节点
                        trail = t;
                    // 将next赋值给t,遍历完整个condition队列
                    t = next;
                }
            }

    <23>reportInterruptAfterWait

     /**
         * 如果interruptMode是THROW_IE,就抛出InterruptedException异常
         * 如果interruptMode是REINTERRUPT,则当前线程再发出中断请求
         * 否则就什么都不做
         */
        private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

    <24> doSignal

      // 将Condition队列中的first节点插入到CLH队列中
        private void doSignal(Node first) {
            do {
                //将first节点从队列投移除, 将他下面的节点设置为新的头
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 取消first节点nextWaiter引用
                first.nextWaiter = null;
                //<25>first加入到CHL队列
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }

    <25>transferForSignal

     // 返回true表示节点node插入到同步队列中,返回false表示节点node没有插入到同步队列中
        final boolean transferForSignal(Node node) {
            // 如果节点node的状态不是Node.CONDITION,或者更新状态失败,
            // 说明该node节点已经插入到同步队列中,所以直接返回false
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            // <10>将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点
            Node p = enq(node);
            int ws = p.waitStatus;
            // 如果前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。
            // 就说明节点p之后也不会发起唤醒下一个node节点线程的操作,
            // 所以这里直接调用 LockSupport.unpark(node.thread)方法,唤醒节点node所在线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

    <26>doSignalAll

       /**
         * 将condition队列中所有的节点都插入到同步队列中
         * @param first condition队列头节点
         */
        private void doSignalAll(Node first) {
            // 表示将condition队列设置为空
            lastWaiter = firstWaiter = null;
            do {
                // 得到condition队列的下一个节点
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                // 将节点first插入到同步队列中
                transferForSignal(first);
                first = next;
                // 循环遍历condition队列中所有的节点 将他们放到CLH队列
            } while (first != null);
        }

    <27>doAcquireNanos

      /**
         * 尝试在一定的时间nanosTimeout内获取锁,超时了就返回false
         */
        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            // 计算截止时间
            final long deadline = System.nanoTime() + nanosTimeout;
            // 为当前线程创建节点node,并插入到队列中
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (; ; ) {
                    final Node p = node.predecessor();
                    // 如果前一个节点是队列头head,并且尝试获取锁成功
                    // 将该节点node设置成队列头head
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
    
                    }
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 计算剩余时间
                nanosTimeout = deadline - System.nanoTime();
                // 剩余时间小于等于0,就直接返回false,获取锁失败
                if (nanosTimeout <= 0L)
                    return false;
                // <5>当p节点的状态是Node.SIGNAL时,调用LockSupport.parkNanos阻塞当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    // 当前线程阻塞nanosTimeout时间 核心就在不是一直等待持有锁的线程释放唤醒,而是阻塞一定时间 
                    LockSupport.parkNanos(this, nanosTimeout);
                // 如果当前线程中断标志位是true,抛出InterruptedException异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            } finally {
                // failed为true,表示发生异常,
                // 则将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了
                if (failed)
                    cancelAcquire(node);
            }
        }

     <28>doAcquireSharedInterruptibly

    跟doReleaseShared 几乎一样,只是信号量是使用的这个方法

      /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
                throws InterruptedException {
            //创建共享节点 并插入队列
            final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED);
            boolean failed = true;
            try {
                //自旋
                for (;;) {
                    //<1>获得节点的上一个节点
                    final AbstractQueuedSynchronizer.Node p = node.predecessor();
                    //如果他的节点就是节点头 尝试获取锁 因为并发情况节点头可能释放了
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //<14>重新设节点头 并尝试唤醒其他等待节点
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //<5>如果前一个节点p的状态是Node.SIGNAL,就是调用<6>parkAndCheckInterrupt方法阻塞当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
    
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    //<11> 此时前一个节点pred的状态只能是0或者PROPAGATE,不可能是CONDITION状态
                    // CONDITION(这个是特殊状态,只在condition列表中节点中存在,CLH队列中不存在这个状态的节点)
                    // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程
                    cancelAcquire(node);
            }
        }

    <29> hasQueuedPredecessors

       /**
         * 判断是否需要排队 false 为空队列 true为需要排队
         * @return
         */
        public final boolean hasQueuedPredecessors() {
           
            //获得尾节点
            AbstractQueuedSynchronizer.Node t = tail;
            //获得头节点
            AbstractQueuedSynchronizer.Node h = head;
            //中间变量s
            AbstractQueuedSynchronizer.Node s;
            /**
             *  h != t 表示队列就一个节点
             *  如果队列不止一节点,同时队列下一个节点不是当前线程 就表示需要排队
             */
            return h != t &&
                    ((s = h.next) == null || s.thread != Thread.currentThread());
        }

    AQS独占锁

    加锁

    一次只能有一个线程可以获取锁。当获取锁的线程释放了锁其他线程才可以获得锁

    java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

     public final void acquire(int arg) {
            /**
             *tryAcquire  模板模式 是抽象方法由子类实现获取锁步骤
             *<1>addWaiter   如果加锁失败 创建节点并放入队列尾
             *<2>acquireQueued 通过新创建的节点 判断是重试获取锁。还是阻塞线程
             */
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(node.EXCLUSIVE), arg))
                //<3>发出线程中断通知
                selfInterrupt();
        }

    1.先调用由子类实现的tryAcquire 具体获取锁的方法

    2.如果没有获取到调用addWaiter 创建一个node节点并插入CLH队列

    3.调用acquireQueued 自旋尝试获取锁,只有当当前节点的前一个节点是head节点才尝试获取锁(因为前面可能有其他线程排队),否则阻塞 等待唤醒 唤醒之后再继续自旋

    4.自旋过程中也要判断Thread.interrupted() 线程是否中断

    5.如果被唤醒后线程已经中断,将当前节点状态设置为0表示已完成, 从队列尾部前遍历,找到当前节点前一个未取消的节点,唤醒线程 让该线程重复3步骤

    含有时间限制的加锁

        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            //如果线程已经中断 直接抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            //<27>尝试获取锁,如果没有获取到 则调用doAcquireNanos 阻塞指定时间
            return tryAcquire(arg) ||
                    doAcquireNanos(arg, nanosTimeout);
        }

    1.先判断线程是否中断,如果中断 则直接抛出异常

    2.调用子类tryAcquire获取独占锁的方法

    3.如果没有获取成功,doAcquireNanos

    4.doAcquireNanos 跟前面独占锁方式是一样的。唯一的不同是自旋里面判断等待锁的时间是否超时了。如果超了直接返回false,如果没有超则调用 LockSupport.parkNanos(this, nanosTimeout);等待指定时间

    5.那么除了其他线程唤醒,还有可能就是超时时间到了自动唤醒,其他线程唤醒了。如果没有获取成功,则判断是否超时 没有超时则继续阻塞指定时间

    释放锁

      // 在独占锁模式下,释放锁的操作
        public final boolean release(int arg) {
            // 调用tryRelease子类方法,尝试去释放锁,由子类具体实现
            if (tryRelease(arg)) {
                Node h = head;
                // <12>如果队列头节点的状态不是0,那么队列中就可能存在需要唤醒的等待节点。
                // 还记得我们在acquireQueued(final Node node, int arg)获取锁的方法中,如果节点node没有获取到锁,
                // 那么我们会将节点node的前一个节点状态设置为Node.SIGNAL,然后调用parkAndCheckInterrupt方法
                // 将节点node所在线程阻塞。
                // 在这里就是通过unparkSuccessor方法,进而调用LockSupport.unpark(s.thread)方法,唤醒被阻塞的线程
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    1.调用子类实现的释放锁的逻辑

    2.如果释放成功,则通过head往下找有效的节点 节点状态非cancel的节点进行唤醒

    共享锁

    加锁

      // 获取共享锁
        public final void acquireShared(int arg) {
            //模板模式 抽象方法 由子类实现 尝试去获取共享锁,如果返回值小于0表示获取共享锁失败 
            if (tryAcquireShared(arg) < 0)
                //<13>调用doAcquireShared方法尝试获取共享锁
                doAcquireShared(arg);
        }

    1.先调用子类获取共享锁

    2.如果获取事变调用doAcquireShared 往CLH队列插入一个共享的Node节点

    3.如果Node的前一个节点是Head节点则尝试获取共享锁(队列排队 前面都获取了自身才能获取)

    4.如果获取失败: 

          1.如果获取失败,则遍历当前节点的前一个节点,如果前一个节点是无效节点则,进行队列整理,遍历当前节点的前一节点,剔除无效状态的节点(节点状态为Cancel的)

          2.然后重新自旋,如果前一节点是Single则触发阻塞

    5.如果获取成功:

                     1.因为是共享锁,当前节点获取成功,则尝试遍历当前节点的下一节有效节点唤醒尝试获取锁(如果下一节点获取成功也会继续此操作)

    释放锁

     // 释放共享锁
        public final boolean releaseShared(int arg) {
            // 模板模式 抽象方法尝试释放共享锁
            if (tryReleaseShared(arg)) {
                //<15> 唤醒等待共享锁的线程
                doReleaseShared();
                return true;
            }
            return false;
        }

    1.先调用子类的释放共享锁的方法

    2.如果释放成功,则判断head节点是否是Signle如果是则代表后续节点是阻塞状态,则尝试唤醒后继节点线程

    Condition

    注 condition为 AQS的内部类

    await

       /**
         * 让当前持有锁的线程阻塞等待,并释放锁。如果有中断请求,则抛出InterruptedException异常
         * @throws InterruptedException
         */
        public final void await() throws InterruptedException {
            // 如果当前线程中断标志位是true,就抛出InterruptedException异常
            if (Thread.interrupted())
                throw new InterruptedException();
            // <17>为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了
            Node node = addConditionWaiter();
            // <18>释放当前线程占有的锁,并唤醒CLH队列一个等待线程 这里可以看出 要await需要获取独占锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // <19>如果节点node不在同步队列中(注意不是Condition队列)
            while (!isOnSyncQueue(node)) {
                // 阻塞当前线程,那么怎么唤醒这个线程呢?
                // 首先我们必须调用signal或者signalAll将这个节点node加入到同步队列。
                // 只有这样unparkSuccessor(Node node)方法,才有可能唤醒被阻塞的线程
                LockSupport.park(this);
                // <21>如果当前线程产生中断请求,就跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // <2>如果节点node已经在同步队列中了,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // <22>清除Condition队列中状态不是Node.CONDITION的节点
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            // <23>是否要抛出异常,或者发出中断请求
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

    1.整理wait队列,将节点状态非CONDITION的移除

    2.创建Node节点状态为CONDITION插入队列尾部

    3.释放当前线程占有的独占锁。如果释放失败则会抛出异常(调用await之前需要先获取独占锁)

    4.循环判断是否在CLH同步队列中(当节点在CLH在同步节点中才结束循环),如果不在则阻塞当前线程,等待唤醒

    5.如果CLH在同步队列中 则尝试获取同步锁,获取失败则在CLH获取锁的自旋里面阻塞

    Signal

      // 如果condition队列不为空,将condition队列头节点插入到同步队列中
        public final void signal() {
            // 子类实现 如果当前线程不是独占锁线程,就抛出IllegalMonitorStateException异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
    
            // 将Condition队列头赋值给节点first
            Node first = firstWaiter;
            //表示wait队列不为空
            if (first != null)
                // <24>将Condition队列中的first节点插入到CLH队列中
                doSignal(first);
        }

    1.从wait节点头节点往下遍历,找到有效节点,并加入CLH队列等待获取锁 只会唤醒一个

    signalAll

      // 将condition队列中所有的节点都插入到同步队列中
        public final void signalAll() {
            //子类实现 如果当前线程不是独占锁线程,就抛出IllegalMonitorStateException异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //获取第一个Waiter节点
            Node first = firstWaiter;
            //<26>表示队列不为空
            if (first != null)
                doSignalAll(first);
        }

     1.遍历wait节点。将所有节点都加入CLH队列尝试获取锁

  • 相关阅读:
    KVM 重命名虚机
    甲醛了解
    递归函数,匿名函数
    函数
    zabbix监控URL
    zabbix自动发现
    vim常用命令总结
    saltstack常用命令
    zabbix监控Apache
    nginx配置详解
  • 原文地址:https://www.cnblogs.com/LQBlog/p/15190407.html
Copyright © 2011-2022 走看看