zoukankan      html  css  js  c++  java
  • 【Java并发专题之五】juc-locks锁框架之AQS

    AQS(AbstractQueuedSynchronizer)框架提供了一套通用的机制来管理同步状态、阻塞/唤醒线程、管理同步队列。

    一、同步机制
    AQS框架描述了一个什么样的锁?

    一个公司有10个人,这个公司只有一个厕所,而且是一个单人使用的厕所,厕所有门,门上有一个显示器:
    当厕所无人使用时,显示器显示:
        厕所状态:空闲中
        使用人:无
    
    当小明上厕所时,显示器显示:
        厕所状态:使用中
        使用人:小明
    
    当小强上厕所时,显示器显示:
        厕所状态:使用中
        使用人:小强
    ...

    1、锁的使用人
    AbstractOwnableSynchronizer父类里定义了一个字段-拥有锁的线程,以及相应的set、get方法,相当于上面厕所使用人:

    private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }

    2、锁状态
    AbstractOwnableSynchronizer的子类AbstractQueuedSynchronizer里定义了锁状态,相当于上面的厕所的厕所状态:

    /**同步状态 锁状态*/
    private volatile int state;
    
    /**获取State*/
    protected final int getState() {
        return state;
    }
    
    /**设置State*/
    protected final void setState(int newState) {
        state = newState;
    }
    
    /**
     * CAS修改状态:当前值和期望值相同时 原子性的修改更新值
     * 返回true说明修改成功  返回false说明当前值和期望值不相同
     */
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    锁就是线程竞争的资源,AQS使用int变量state来表示这个资源状态(同步状态),这与synchronize内部锁(底层锁mutex)是不一样的,这里叫同步状态比锁更合适一些。
    独占模式:比如ReentrantLock,初始state=0,某一个线程获取到同步状态state+1,释放同步状态state-1;
    共享模式:比如Semaphore/CountDownLatch,多个线程都会获取到同步状态;

    二、内部类Node
    Node类被设计作为同步队列(CLH)和条件队列(Condition)的节点,条件队列只限独占模式下使用。

    源码:

    static final class Node {
            /** 静态属性:标记节点为共享模式*/
            static final Node SHARED = new Node();
            /** 静态属性:标记节点为独占模式*/
            static final Node EXCLUSIVE = null;
    
            /** 静态属性:waitStatus的值 表明线程已取消 当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止。*/
            static final int CANCELLED =  1;
            /** 静态属性:waitStatus的值  表明后继节点中的线程需要唤醒 用于独占模式
             * 当前线程的后继线程被阻塞或者即将被阻塞,。当前线程释放锁或者取消后需要唤醒后继线程
             * 这个状态一般都是后继线程来设置前驱节点的。
             */
            static final int SIGNAL    = -1;
            /** 静态属性:waitStatus的值 当前节点需要进入等待队列 用于独占模式*/
            static final int CONDITION = -2;
            /**静态属性:waitStatus的值 用于共享模式
             * 比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作
             * 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。
             * 在一个节点成为头节点之前,是不会跃迁为此状态的
             */
            static final int PROPAGATE = -3;
            /**
             * Status field, taking on only the values:
             *   SIGNAL、CANCELLED、CONDITION:、PROPAGATE、0
             *   初始化状态0用于同步队列;初始化-2用于等待队列,并通过CAS方法来修改
             */
            volatile int waitStatus;
    
            /**当前节点的前一个节点,用于维持同步队列(CLH)*/
            volatile Node prev;
    
            /**当前节点的下一个节点,用于维持同步队列(CLH)*/
            volatile Node next;
    
            /**当前节点包裹的线程*/
            volatile Thread thread;
    
            /**
             * 如果是共享模式:nextWaiter=SHARED
             * 如果是独占模式:nextWaiter为等待队列的下一个节点
             */
            Node nextWaiter;
    
            /** 根据nextWaiter判断当前节点是否是共享模式*/
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
             /** 返回当前节点的前一个节点*/
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            /**无参构造器 用来确立初始头节点或共享模式 */
            Node() {    // Used to establish initial head or SHARED marker
            }
            /**用在addWaiter方法里 向CLH队列添加新节点的构造器*/
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
            /**用在Condition里向等待队列添加新节点的构造器*/
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

    1、模式

     使用Node类型的静态字来标记模式类型:

    共享模式:Node.SHARED
    独占模式:Node.EXCLUSIVE

    用于标识节点模式的字段和方法:

    Node nextWaiter;
    
    /** 根据nextWaiter判断当前节点是否是共享模式*/
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    (1)使用无参构造器Node()创建的Node,nextWaiter默认是null,所以默认是独占模式;
    (2)使用构造器Node(Thread thread, Node mode),将模式赋值给nextWaiter,这时就会按指定模式来创建节点;

    2、waitStatus

    用来标识Node节点状态,有5种状态:

    0:节点初始状态
    Node.CANCELLED(1):节点的终止状态,说明节点的中的线程因为超时、中断被取消执行了,所以该节点没用了需要从队列中移除;
    Node.SIGNAL(-1):信号状态,其后继节点应当阻塞,当该节点取消或释放锁后,应唤醒后继节点;
    Node.CONDITION(-2):加入条件队列的节点的初始状态;
    Node.PROPAGATE(-3):传播状态,用于共享模式;

    五种状态的转换关系:

    3、nextWaiter
    (1)标识节点模式;
    (2)条件队列里作为下一个节点的引用,借此构成一个单向队列;

    4、prev
    同步队列中一个节点对前驱节点的引用。
    重要作用,通过prev处理取消状态的节点,如果节点取消,那么它后续节点需要连接到非取消状态的前驱节点。

    5、next
    同步队列中一个节点对后继节点的引用。
    重要作用,通过next可以实现对后继结点的阻塞。

    三、同步队列(CLH)
    对于竞争同步状态失败的线程如何处理呢?AQS内部维护了一个CLH队列,双向链表,严格实现FIFO先来先服务:能成功获取到锁的必定是首节点,将获取锁失败的线程放入队列尾部暂存,依次获取锁。
    注意:CLH队列适合SMP架构,不适合NUMP架构(采用MCS队列锁)。
    1、队列结构

    (1)head节点:AQS里定义一个字段来引用同步队列的头节点,强调的是引用,不是一个包含线程的Node节点

    private transient volatile Node head;
    
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    /**
     * 通过CAS函数设置head值,仅仅在enq方法中调用
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    (2)tail节点:AQS里定义一个字段来引用同步队列的尾节点,强调的是引用,不是一个包含线程的Node节点

    private transient volatile Node tail;
    /**
     * 通过CAS函数设置tail值,仅仅在enq方法中调用
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    (3)head和tail节点值的变化

    初始化:head = tail = null
    n1入队:head = tail = n1
    n2入队:head = n1,tail = n2

    2、队列操作

    (1)入队操作:CLH队列是FIFO队列,故新的节点到来的时候,是要插入到当前队列的尾节点之后。试想一下,当一个线程成功地获取了同步状态,其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个CAS方法,它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。入队操作示意图大致如下:

    /**按照模式来创建节点 并将节点加入同步队列*/
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 快速尝试
            Node pred = tail;
            if (pred != null) {//先来判断尾节点是否为空
                node.prev = pred;//新节点的前驱节点为tail
                if (compareAndSetTail(pred, node)) {//通过CAS在队尾插入当前节点 将新节点设置为tail 这一步可能为失败  返回false
                    pred.next = node;//原先tail后继节点为新节点
                    return node;//返回新增node
                }
            }
            //初始情况或者在快速尝试失败后插入节点
            enq(node);
            return node;
        }
    /**
         * 入队:同步队列添加节点
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                //尾节点为空说明队列为空 需要先设置头节点 然后设置尾节点(头尾相同)
                if (t == null) { // Must initialize 初始化head和tail  pre和next没有初始化
                    if (compareAndSetHead(new Node()))//可能会失败
                        tail = head;
                } else {
                    /*
                     * AQS的精妙就是体现在很多细节的代码,比如需要用CAS往队尾里增加一个元素
                     * 此处的else分支是先在CAS的if前设置node.prev = t,而不是在CAS成功之后再设置。
                     * 一方面是基于CAS的双向链表插入目前没有完美的解决方案,另一方面这样子做的好处是:
                     * 保证每时每刻tail.prev都不会是一个null值,否则如果node.prev = t
                     * 放在下面if的里面,会导致一个瞬间tail.prev = null,这样会使得队列不完整。
                     */
                    node.prev = t;//设置新节点的pre为当前尾节点
                    if (compareAndSetTail(t, node)) {//将新节点设置为尾节点
                        t.next = node;//CAS设置tail为node,成功后把老的tail也就是t连接到node。
                        return t;
                    }
                }
            }
        }

    (2)出队操作:因为遵循FIFO规则,所以能成功获取到AQS同步状态的必定是首节点,首节点的线程在释放同步状态时,会唤醒后续节点,而后续节点会在获取AQS同步状态成功的时候将自己设置为首节点。设置首节点是由获取同步成功的线程来完成的,由于只能有一个线程可以获取到同步状态,所以设置首节点的方法不需要像入队这样的CAS操作,只需要将首节点设置为原首节点的后续节点同时断开原节点、后续节点的引用即可。出队操作示意图大致如下:

     

    四、独占锁
    1、获取锁:当多个线程一起获取锁的时候,只有一个线程能获取到锁,其他线程必须在当前位置阻塞等待。
    流程总结:

    代码分析:
    (1)acquire方法,获取锁:

    /**
         *  获取独占锁。如果没有获取到,线程就会阻塞等待,直到获取锁。不会响应中断异常
         */
        public final void acquire(int arg) {
            // 1. 先调用tryAcquire方法,尝试获取独占锁,返回true,表示获取到锁,不需要执行acquireQueued方法。
            // 2. 调用acquireQueued方法,先调用addWaiter方法为当前线程创建一个节点node,并插入队列中,
            // 然后调用acquireQueued方法去获取锁,如果不成功,就会让当前线程阻塞,当锁释放时才会被唤醒。
            // acquireQueued方法返回值表示在线程等待过程中,是否有另一个线程调用该线程的interrupt方法,发起中断。
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    (2)tryAcquire方法,尝试获取锁,举例ReentrantLock中FairSync的tryAcquire方法:

    // 尝试获取锁,与非公平锁最大的不同就是调用hasQueuedPredecessors()方法
            // hasQueuedPredecessors方法返回true,表示等待线程队列中有一个线程在当前线程之前,
            // 根据公平锁的规则,当前线程不能获取锁。
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();// 获取锁的记录状态
                if (c == 0) {// 如果c==0表示当前锁是空闲的
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires))
                    {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 判断当前线程是不是独占锁的线程
                else if (current == getExclusiveOwnerThread())
                {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc); // 更改锁的记录状态
                    return true;
                }
                return false;
            }

    (3)acquireQueued方法,自旋获取锁,如果没有获取到,就让当前线程阻塞等待:

    /**
         * 想要获取锁的 acquire系列方法,都会这个方法来获取锁
         * 循环通过tryAcquire方法不断去获取锁,如果没有获取成功,
         * 就有可能调用parkAndCheckInterrupt方法,让当前线程阻塞
         * @param node 想要获取锁的节点
         * @param arg
         * @return 返回true,表示在线程等待的过程中,线程被中断了
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 表示线程在等待过程中,是否被中断了
                boolean interrupted = false;
                for (;;) {// 通过死循环,直到node节点的线程获取到锁,才返回
                    //获取node的前一个节点
                    final Node p = node.predecessor();
                    // 如果前一个节点是队列头head,并且尝试获取锁成功
                    // 那么当前线程就不需要阻塞等待,继续执行
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);// 将节点node设置为新的队列头
                        p.next = null; //去掉后继引用方便GC help GC
                        failed = false;// 不需要调用cancelAcquire方法
                        return interrupted;
                    }
                    // 当p节点的状态是Node.SIGNAL时,就会调用parkAndCheckInterrupt方法,阻塞node线程
                    // node线程被阻塞,有两种方式唤醒,
                    // 1.是在unparkSuccessor(Node node)方法,会唤醒被阻塞的node线程,返回false
                    // 2.node线程被调用了interrupt方法,线程被唤醒,返回true
                    // 在这里只是简单地将interrupted = true,没有跳出for的死循环,继续尝试获取锁
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // failed为true,表示发生异常,非正常退出
                // 则将node节点的状态设置成CANCELLED,表示node节点中的线程已取消,不需要唤醒了。
                if (failed)
                    cancelAcquire(node);
            }
        }

    (4)shouldParkAfterFailedAcquire方法,返回值决定是否要阻塞当前线程

    /**
         * 根据前一个节点pred的状态,来判断当前线程是否应该被阻塞
         * @param pred : node节点的前一个节点
         * @param node
         * @return 返回true 表示当前线程应该被阻塞,之后应该会调用parkAndCheckInterrupt方法来阻塞当前线程
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            //如果前一个pred的状态是Node.SIGNAL,那么直接返回true,当前线程应该被阻塞
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                // 如果前一个节点状态是Node.CANCELLED(大于0就是CANCELLED),
                // 表示前一个节点所在线程已经被唤醒了,要从CLH队列中移除CANCELLED的节点。
                // 所以从pred节点一直向前查找直到找到不是CANCELLED状态的节点,并把它赋值给node.prev,
                // 表示node节点的前一个节点已经改变。
                do {
                    node.prev = pred = pred.prev;
                    //pred = pred.prev;
                    //node.prev = pred;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 此时前一个节点pred的状态只能是0或者PROPAGATE,不可能是CONDITION状态
                // CONDITION(这个是特殊状态,只在condition列表中节点中存在,CLH队列中不存在这个状态的节点)
                // 将前一个节点pred的状态设置成Node.SIGNAL,这样在下一次循环时,就是直接阻塞当前线程
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    (5)parkAndCheckInterrupt方法,阻塞当前线程,线程被唤醒后返回当前线程中断状态:

    /**
         * 阻塞当前线程,线程被唤醒后返回当前线程中断状态
         */
        private final boolean parkAndCheckInterrupt() {
            //通过LockSupport.park方法,阻塞当前线程
            LockSupport.park(this);
            // 当前线程被唤醒后,返回当前线程中断状态
            return Thread.interrupted();
        }

    LockSupport.park作用就会停止自旋,阻塞在parkAndCheckInterrupt(),LockSupport.unpark唤醒的时候从parkAndCheckInterrupt()位置继续运行自旋。

    (6)cancelAcquire方法,将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了:

    /**
         *将node节点的状态设置成CANCELLED,表示node节点所在线程已取消,不需要唤醒了。
         */
        private void cancelAcquire(Node node) {
            // 如果node为null,就直接返回
            if (node == null)
                return;
    
            node.thread = null;
    
            // 跳过那些已取消的节点,在队列中找到在node节点前面的第一次状态不是已取消的节点
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            //记录pred原来的下一个节点,用于CAS函数更新时使用
            Node predNext = pred.next;
    
            //将node节点状态设置为已取消Node.CANCELLED;
            node.waitStatus = Node.CANCELLED;
    
            /*
            如果node节点是队列尾节点,那么就将pred节点设置为新的队列尾节点
             * 如果CAS将tail从node置为pred节点了
             * 则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。
             * 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。
             * 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。
             *
             * 这里的CAS更新pred的next即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新掉了。
             */
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                //如果node还有后继节点,这种情况要做的事情是把pred和后继非取消节点拼起来。
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    /*
                     * 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点
                     * 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。
                     */
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    /*
                     * 这时说明pred == head或者pred状态取消或者pred.thread == null
                     * 在这些情况下为了保证队列的活跃性,需要去唤醒一次后继线程。
                     * 举例来说pred == head完全有可能实际上目前已经没有线程持有锁了,
                     * 自然就不会有释放锁唤醒后继的动作。如果不唤醒后继,队列就挂掉了。
                     *
                     * 这种情况下看似由于没有更新pred的next的操作,队列中可能会留有一大把的取消节点。
                     * 实际上不要紧,因为后继线程唤醒之后会走一次试获取锁的过程,
                     * 失败的话会走到shouldParkAfterFailedAcquire的逻辑。
                     * 那里面的if中有处理前驱节点如果为取消则维护pred/next,踢掉这些取消节点的逻辑。
                     */
                    unparkSuccessor(node);
                }
    
                /*
                 * 取消节点的next之所以设置为自己本身而不是null,
                 * 是为了方便AQS中Condition部分的isOnSyncQueue方法,
                 * 判断一个原先属于条件队列的节点是否转移到了同步队列。
                 *
                 * 因为同步队列中会用到节点的next域,取消节点的next也有值的话,
                 * 可以断言next域有值的节点一定在同步队列上。
                 *
                 * 在GC层面,和设置为null具有相同的效果。
                 */
                node.next = node; // help GC
            }
        }

    2、释放锁:获取锁的线程释放锁资源,而且还必须能唤醒正在等待锁资源的一个线程

    代码分析:
    (1)release方法
    调用tryRelease方法释放锁资源,返回true表示锁资源完全释放了,返回false表示还持有锁资源。
    如果锁资源完全被释放了,就要唤醒等待锁资源的线程。调用unparkSuccessor方法唤醒一个等待线程
    注:CLH队列头节点h为null,表示队列为空,没有节点。节点h的状态是0,表示CLH队列中没有被阻塞的线程。

    /**
         * 在独占锁模式下,释放锁的操作
         */
        public final boolean release(int arg) {
            //调用tryRelease方法,尝试去释放锁,由子类具体实现
            //调用tryRelease方法释放锁资源,返回true表示锁资源完全释放了,返回false表示还持有锁资源。
            if (tryRelease(arg)) {
                /*
                 * 此时的head节点可能有3种情况:
                 * 1. null (AQS的head延迟初始化+无竞争的情况)
                 * 2. 当前线程在获取锁时new出来的节点通过setHead设置的
                 * 3. 由于通过tryRelease已经完全释放掉了独占锁,有新的节点在acquireQueued中获取到了独占锁,并设置了head
    
                 * 第三种情况可以再分为两种情况:
                 * (一)时刻1:线程A通过acquireQueued,持锁成功,set了head
                 *          时刻2:线程B通过tryAcquire试图获取独占锁失败失败,进入acquiredQueued
                 *          时刻3:线程A通过tryRelease释放了独占锁
                 *          时刻4:线程B通过acquireQueued中的tryAcquire获取到了独占锁并调用setHead
                 *          时刻5:线程A读到了此时的head实际上是线程B对应的node
                 * (二)时刻1:线程A通过tryAcquire直接持锁成功,head为null
                 *          时刻2:线程B通过tryAcquire试图获取独占锁失败,入队过程中初始化了head,进入acquiredQueued
                 *          时刻3:线程A通过tryRelease释放了独占锁,此时线程B还未开始tryAcquire
                 *          时刻4:线程A读到了此时的head实际上是线程B初始化出来的傀儡head
                 */
                Node h = head;
                //CLH队列头节点h为null,表示队列为空,没有节点。节点h的状态是0,表示CLH队列中没有被阻塞的线程
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);//如果锁资源完全被释放了,就要唤醒等待锁资源的线程。调用unparkSuccessor方法唤醒一个等待线程
                return true;
            }
            return false;
        }

    (2)tryRelease方法

    ReentrantLock中Sync的tryRelease方法实现:可重入锁,一个线程可以重复多次进入该线程已经锁住的代码快,锁住多少次,就需要释放多少次。

    protected final boolean tryRelease(int releases) {
                //c表示新的锁的记录状态
                int c = getState() - releases;
                //如果当前线程不是独占锁的线程,就抛出IllegalMonitorStateException异常
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                // 设置独占锁的线程为null
                setState(c);
                return free;
            }

    (3)unparkSuccessor方法

    LockSupport.unpark唤醒node节点的下一个非取消状态的节点所在线程,唤醒线程也是有顺序的,就是添加到CLH队列线程的顺序。

    /**唤醒当前节点的后继结点 唤醒后状态为0*/
        //唤醒node节点的下一个非取消状态的节点所在线程(即waitStatus<=0)
        private void unparkSuccessor(Node node) {
            /*
             * 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁。
             */
            int ws = node.waitStatus;
            //如果小于0,就将状态重新设置为0,表示这个node节点已经完成了
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可
             * 否则需要从tail开始向前找到node之后最近的非取消节点。
             *
             * 这里为什么要从tail开始向前查找也是值得琢磨的:
             * 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。
             * 不妨考虑到如下场景:
             * 1. node某时刻为tail
             * 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己
             * 3. compareAndSetTail成功
             * 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!
             */
            Node s = node.next;
            // 如果下一个节点为null,或者状态是已取消,那么就要寻找下一个非取消状态的节点
            if (s == null || s.waitStatus > 0) {
                s = null;//先将s设置为null,s不是非取消状态的节点
                // 从队列尾向前遍历,直到遍历到node节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                   // 因为是从后向前遍历,所以不断覆盖找到的值,这样才能得到node节点后下一个非取消状态的节点
                    if (t.waitStatus <= 0)
                        s = t;
            }
            // 如果s不为null,表示存在非取消状态的节点。那么调用LockSupport.unpark方法,唤醒这个节点的线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    独占锁示例,这篇文章通过示例和图解讲的很清楚。

    五、共享锁
    共享锁可能被多个线程共同持有.
    1、获取
    (1)acquireShared方法

    //获取共享锁
        public final void acquireShared(int arg) {
            //尝试去获取共享锁,如果返回值小于0表示获取共享锁失败
            if (tryAcquireShared(arg) < 0)
                //调用doAcquireShared方法去获取共享锁
                doAcquireShared(arg);
        }

    (2)tryAcquireShared方法

    ReentrantReadWriteLock中Sync的tryAcquireShared方法实现:

    protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                //当前线程是第一个获取读锁(共享锁)的线程
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }

    (3)doAcquireShared方法

    /**
         * 获取共享锁,获取失败,则会阻塞当前线程,直到获取共享锁返回
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            // 为当前线程创建共享锁节点node
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //如果节点node前一个节点是同步队列头节点。就会调用tryAcquireShared方法尝试获取共享锁
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        //如果返回值大于0,表示获取共享锁成功
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    (4)setHeadAndPropagate方法

    // 重新设置CLH队列头,如果CLH队列头的下一个节点为null或者共享模式,
        // 那么就要唤醒共享锁上等待的线程
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);//设置新的同步队列头head
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())//如果节点s是空或者共享模式节点,那么就要唤醒共享锁上等待的线程
                    doReleaseShared();
            }
        }

    2、释放

    (1)releaseShared方法

    //释放共享锁
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

    (2)tryReleaseShared方法

    举例 ReentrantReadWriteLock中Sync的tryReleaseShared方法实现

    protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                // 当前线程是第一个获取读锁(共享锁)的线程
                if (firstReader == current) {
                    // 将firstReaderHoldCount减一,如果就是1,那么表示该线程需要释放读锁(共享锁),
                    // 将firstReader设置为null
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    // 获取当前线程的HoldCounter变量
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    // 将rh变量的count减一,
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        // count <= 0表示当前线程就没有获取到读锁(共享锁),这里释放就抛出异常。
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    // 因为读锁是利用高16位储存的,低16位的数据是要屏蔽的,
                    // 所以这里减去SHARED_UNIT(65536),相当于减一
                    // 表示一个读锁已经释放
                    int nextc = c - SHARED_UNIT;
                    // 利用CAS函数重新设置state值
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }

    (3)doReleaseShared方法

    // 释放共享锁 会唤醒等待共享锁的线程
        private void doReleaseShared() {
            /*
             * 自旋
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    // 如果状态是Node.SIGNAL,就要唤醒节点h后继节点的线程
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 将节点h的状态设置成0,如果设置失败,就继续循环,再试一次。
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    // 如果节点h的状态是0,就设置ws的状态是PROPAGATE。
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // 如果同步队列头head节点发生改变,继续循环,
                // 如果没有改变,就跳出循环
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    CountDownLatch(倒计时锁、共享锁) 这篇文章有示例和图解,讲的很清楚.

    ReentrantReadWriteLock使用示例

    StampedLock使用示例

    六、ConditionObject


    Condition接口是为了解决线程之间相互等待的问题,注意Condition对象只能在独占锁中才能使用。

    Condition可看做是Obejct类的wait()、notify()、notifyAll()方法的替代品,与Lock配合使用。当线程执行condition对象的await方法时,当前线程会立即释放锁,并进入对象的等待区,等待其它线程唤醒或中断。
    JUC在实现Conditon对象时,其实是通过实现AQS框架,来实现了一个Condition等待队列。

    (1)内部存在一个Condition队列,存储着所有在此Condition条件等待的线程。
    (2)await系列方法:让当前持有锁的线程释放锁,并唤醒一个在CLH队列上等待锁的线程,再为当前线程创建一个node节点,插入到Condition队列(注意不是插入到CLH队列中)
    (3)signal系列方法:其实这里没有唤醒任何线程,而是将Condition队列上的等待节点插入到CLH队列中,所以当持有锁的线程执行完毕释放锁时,就会唤醒CLH队列中的一个线程,这个时候才会唤醒线程。

    1、await系列方法

    (1)await方法

         /**
             * 让当前持有锁的线程阻塞等待,并释放锁。如果有中断请求,则抛出InterruptedException异常
             * @throws InterruptedException
             */
            public final void await() throws InterruptedException {
                // 如果当前线程中断标志位是true,就抛出InterruptedException异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了
                Node node = addConditionWaiter();
                // 释放当前线程占有的锁,并唤醒CLH队列一个等待线程
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 如果节点node不在同步队列中(注意不是Condition队列)
                while (!isOnSyncQueue(node)) {
                    // 阻塞当前线程,那么怎么唤醒这个线程呢?
                    // 首先我们必须调用signal或者signalAll将这个节点node加入到同步队列。
                    // 只有这样unparkSuccessor(Node node)方法,才有可能唤醒被阻塞的线程
                    LockSupport.park(this);
                    // 如果当前线程产生中断请求,就跳出循环
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 如果节点node已经在同步队列中了,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 清除Condition队列中状态不是Node.CONDITION的节点
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                // 是否要抛出异常,或者发出中断请求
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }

    方法流程:

    1. addConditionWaiter方法:为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了
    2. fullyRelease方法:释放当前线程占有的锁,并唤醒CLH队列一个等待线程
    3. isOnSyncQueue 方法:如果返回false,表示节点node不在CLH队列中,即没有调用过 signal系列方法,所以调用LockSupport.park(this)方法阻塞当前线程。
    4. 如果跳出while循环,表示节点node已经在CLH队列中,那么调用acquireQueued方法去获取锁。
    5. 清除Condition队列中状态不是Node.CONDITION的节点

    (2)addConditionWaiter方法

    为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中了

        private Node addConditionWaiter() {
                Node t = lastWaiter;
                // 如果Condition队列尾节点的状态不是Node.CONDITION
                if (t != null && t.waitStatus != Node.CONDITION) {
                    // 清除Condition队列中,状态不是Node.CONDITION的节点,
                    // 并且可能会重新设置firstWaiter和lastWaiter
                    unlinkCancelledWaiters();
                    // 重新将Condition队列尾赋值给t
                    t = lastWaiter;
                }
                // 为当前线程创建一个状态为Node.CONDITION的节点
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                // 如果t为null,表示Condition队列为空,将node节点赋值给链表头
                if (t == null)
                    firstWaiter = node;
                else
                    // 将新节点node插入到Condition队列尾
                    t.nextWaiter = node;
                // 将新节点node设置为新的Condition队列尾
                lastWaiter = node;
                return node;
            }

    (3)fullyRelease方法

    释放当前线程占有的锁,并唤醒CLH队列一个等待线程

        /**
         * 释放当前线程占有的锁,并唤醒CLH队列一个等待线程
         * 如果失败就抛出异常,设置node节点的状态是Node.CANCELLED
         * @return
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                // 释放当前线程占有的锁
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

    (4)isOnSyncQueue方法

    节点node是不是在CLH队列中

        // 节点node是不是在CLH队列中
        final boolean isOnSyncQueue(Node node) {
            // 如果node的状态是Node.CONDITION,或者node没有前一个节点prev,
            // 那么返回false,节点node不在同步队列中
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            // 如果node有下一个节点next,那么它一定在同步队列中
            if (node.next != null) // If has successor, it must be on queue
                return true;
            // 从同步队列中查找节点node
            return findNodeFromTail(node);
        }
    
        // 在同步队列中从后向前查找节点node,如果找到返回true,否则返回false
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }

    (5)acquireQueued方法

    获取独占锁,见上面。

    (6)unlinkCancelledWaiters 方法

    清除Condition队列中状态不是Node.CONDITION的节点

        private void unlinkCancelledWaiters() {
                // condition队列头赋值给t
                Node t = firstWaiter;
                // 这个trail节点,只是起辅助作用
                Node trail = null;
                while (t != null) {
                    //得到下一个节点next。当节点是condition时候,nextWaiter表示condition队列的下一个节点
                    Node next = t.nextWaiter;
                    // 如果节点t的状态不是CONDITION,那么该节点就要从condition队列中移除
                    if (t.waitStatus != Node.CONDITION) {
                        // 将节点t的nextWaiter设置为null
                        t.nextWaiter = null;
                        // 如果trail为null,表示原先的condition队列头节点实效,需要设置新的condition队列头
                        if (trail == null)
                            firstWaiter = next;
                        else
                            // 将节点t从condition队列中移除,因为改变了引用的指向,从condition队列中已经找不到节点t了
                            trail.nextWaiter = next;
                        // 如果next为null,表示原先的condition队列尾节点也实效,重新设置队列尾节点
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        // 遍历到的有效节点
                        trail = t;
                    // 将next赋值给t,遍历完整个condition队列
                    t = next;
                }
            }

    (7)reportInterruptAfterWait方法

        /**
             * 如果interruptMode是THROW_IE,就抛出InterruptedException异常
             * 如果interruptMode是REINTERRUPT,则当前线程再发出中断请求
             * 否则就什么都不做
             */
            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }

    2、signal系列方法

    (1)signal方法

    如果condition队列不为空,就调用doSignal方法将condition队列头节点插入到CLH队列中。

           // 如果condition队列不为空,将condition队列头节点插入到同步队列中
            public final void signal() {
                // 如果当前线程不是独占锁线程,就抛出IllegalMonitorStateException异常
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
    
                // 将Condition队列头赋值给节点first
                Node first = firstWaiter;
                if (first != null)
                    //  将Condition队列中的first节点插入到CLH队列中
                    doSignal(first);
            }

    (2)doSignal方法

            // 将Condition队列中的first节点插入到CLH队列中
            private void doSignal(Node first) {
                do {
                    // 原先的Condition队列头节点取消,所以重新赋值Condition队列头节点
                    // 如果新的Condition队列头节点为null,表示Condition队列为空了
                    // ,所以也要设置Condition队列尾lastWaiter为null
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    // 取消first节点nextWaiter引用
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }

    (3)transferForSignal方法

          // 返回true表示节点node插入到同步队列中,返回false表示节点node没有插入到同步队列中
        final boolean transferForSignal(Node node) {
            // 如果节点node的状态不是Node.CONDITION,或者更新状态失败,
            // 说明该node节点已经插入到同步队列中,所以直接返回false
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            // 将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点
            Node p = enq(node);
            int ws = p.waitStatus;
            // 如果前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。
            // 就说明节点p之后也不会发起唤醒下一个node节点线程的操作,
            // 所以这里直接调用 LockSupport.unpark(node.thread)方法,唤醒节点node所在线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    • 状态不是 Node.CONDITION的节点,是不能从Condition队列中插入到CLH队列中。直接返回false
    • 调用enq方法,将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点
    • 如果前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。那么就要LockSupport.unpark(node.thread)方法唤醒node节点所在线程。

    (4)signalAll 方法

         // 将condition队列中所有的节点都插入到同步队列中
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }

    (5)doSignalAll方法

    循环遍历整个condition队列,调用transferForSignal方法,将节点插入到CLH队列中。

        /**
             * 将condition队列中所有的节点都插入到同步队列中
             * @param first condition队列头节点
             */
            private void doSignalAll(Node first) {
                // 表示将condition队列设置为空
                lastWaiter = firstWaiter = null;
                do {
                    // 得到condition队列的下一个节点
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    // 将节点first插入到同步队列中
                    transferForSignal(first);
                    first = next;
                    // 循环遍历condition队列中所有的节点
                } while (first != null);
            }

    Condition示例

    参考:

    Java并发编程-看懂AQS的前世今生
    JUC锁框架_AbstractQueuedSynchronizer详细分析
    AbstractQueuedSynchronizer源码解读

    *JUC线程框架深度解析

    *透彻理解Java并发编程

  • 相关阅读:
    心情不好的时候
    离骚
    沁园春.雪
    顾炎武《精卫》
    韩愈《祝融峰》
    Python量化交易的简单介绍
    H5页面跳转到小程序代码
    小程序上拉加载,下拉刷新
    第一阶段:Python开发基础 day36 并发编程之Process的join用法和其他用法
    课后练习 第一阶段:Python开发基础 day38 多线程相关小练习
  • 原文地址:https://www.cnblogs.com/cac2020/p/12098693.html
Copyright © 2011-2022 走看看