zoukankan      html  css  js  c++  java
  • Java并发编程--AQS详解

    1. 概述

    AQS(AbstractQueuedSynchronizer)提供了原子式管理同步状态/阻塞和唤醒线程功能以及队列模型的简单框架, 许多同步类实现都依赖于它,
    如常用的ReentrantLock/Semaphore/CountDownLatch等.

    2. 框架

    它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列). AQS 中的队列是CLH(Craig, Landin and Hagersten) 单向链表的变体--虚拟双向队列(FIFO).
    双向链表中, 第一个节点为虚节点, 其实并不存储任何信息, 只是占位. 真正的第一个有数据的节点, 是在第二个节点开始的.

    AQS定义两种资源共享方式: Exclusive(独占, 只有一个线程能执行, 如:ReentrantLock)和Share(共享, 多个线程可同时执行, 如:Semaphore/CountDownLatch). 不同的自定义同步器争用共享资源的方式也不同. 自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可, 至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等)AQS已经在顶层实现好了.

    ReentrantLock为例, state初始化为0, 表示未锁定状态. A线程lock()时, 会调用tryAcquire()独占该锁并将state+1. 此后, 其他线程再tryAcquire()时就会失败, 直到 A 线程unlock()到state=0(即释放锁)为止, 其它线程才有机会获取该锁.当然, 释放锁之前, A 线程自己是可以重复获取此锁的(state会累加), 这就是可重入的概念. 但要注意, 获取多少次就要释放多么次,这样才能保证state是能回到零态的.

    再以CountDownLatch以例, 任务分为 N 个子线程去执行, state也初始化为 N(注意N要与线程个数一致). 这N个子线程是并行执行的,每个子线程执行完后会countDown()一次, state会CAS减1, 等到所有子线程都执行完后(即state=0), 会unpark()主调用线程, 然后主调用线程就会从await()函数返回, 继续后续动作.

    一般来说, 自定义同步器要么是独占方法, 要么是共享方式, 他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可. 但AQS也支持自定义同步器同时实现独占和共享两种方式, 如ReentrantReadWriteLock. 

    3. 源码详解

    3.1 Node

    Node是对每一个等待获取资源的线程的封装, 其包含了需要同步的线程本身及其等待状态, 如是否被阻塞、是否等待唤醒、是否已经被取消等.

    3.1.1 Node的方法和属性值

    • waitStatus: 当前节点在队列中的状态
    • thread: 表示处于该节点的线程
    • prev: 前驱指针
    • predecessor(): 返回前驱节点, 没有的话抛出npe
    • nextWaiter: 指向下一个处于 CONDITION 状态的节点
    • next: 后继指针

    3.1.2 waitStatus

    • CANCELLED(1): 表示当前结点已取消调度. 当timeout或被中断(响应中断的情况下), 会触发变更为此状态, 进入该状态后的结点将不会再变化.
    • SIGNAL(-1): 表示后继结点在等待当前结点唤醒. 后继结点入队时, 会将前继结点的状态更新为SIGNAL.
    • CONDITION(-2): 表示结点等待在Condition上, 当其他线程调用了Condition的signal()方法后, CONDITION状态的结点将从等待队列转移到同步队列中, 等待获取同步锁.
    • PROPAGATE(-3): 共享模式下, 前继结点不仅会唤醒其后继结点, 同时也可能会唤醒后继的后继结点.
    • 0: 新结点入队时的默认状态.

    3.2 获取资源

    3.2.1 acquire(int)

    此方法是独占模式下线程获取共享资源的顶层入口. 如果获取到资源, 线程直接返回, 否则进入等待队列, 直到获取到资源为止, 且整个过程忽略中断的影响.

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    函数流程如下:

    1. tryAcquire()尝试直接去获取资源, 如果成功则直接返回(这里体现了非公平锁, 每个线程获取锁时会尝试直接抢占加塞一次, 而CLH队列中可能还有别的线程在等待);
    2. addWaiter()将该线程加入等待队列的尾部, 并标记为独占模式;
    3. acquireQueued()使线程阻塞在等待队列中获取资源, 一直获取到资源后才返回. 如果在整个等待过程中被中断过, 则返回true, 否则返回false.
    4. 如果线程在等待过程中被中断过, 它是不响应的. 只是获取资源后才再进行自我中断selfInterrupt(), 将中断补上.

    3.2.2 tryAcquire(int)

    此方法尝试去获取独占资源. 如果获取成功, 则直接返回true, 否则直接返回false. AQS只是一个框架, 具体资源的获取/释放方式交由自定义同步器去实现.

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }

    这里之所以没有定义成abstract, 是因为独占模式下只用实现tryAcquire-tryRelease, 而共享模式下只用实现tryAcquireShared-tryReleaseShared. 如果都定义成abstract, 那么每个模式也要去实现另一模式下的接口. 说到底Doug Lea还是站在开发者的角度, 尽量减少不必要的工作量.

    3.2.3 addWaiter(Node)

    此方法用于将当前线程加入到等待队列的队尾, 并返回当前线程所在的结点.

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 尝试快速加入队尾, 失败则使用 enq 把节点加入队尾
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    3.2.4 enq(Node)

    此方法用于将node加入队尾.

        private Node enq(final Node node) {
            for (;;) { // 自旋, 直到成功加入队尾
                Node t = tail;
                if (t == null) { // 队列为空, 初始化队列
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) { // 把当前节点加入队尾
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    3.2.5 acquireQueued(Node, int)

    通过tryAcquire()和addWaiter(), 该线程获取资源失败, 已经被放入等待队列尾部了. 线程下一步进入等待状态, 直到其他线程彻底释放资源后唤醒它.

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true; // 标记是否成功获取资源
            try {
                boolean interrupted = false; // 标记等待过程中是否被中断过
                for (;;) { // 自旋
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) { // 前驱节点为 head, 则可以尝试获取资源.
                        setHead(node); // 将 head 指向该节点
                        p.next = null; // 切断之前的 head 与链表的关联, help for GC.
                        failed = false;
                        return interrupted;
                    }
                    // 判断在获取资源失败后是否进入 waiting 状态, 直到被 unpark().
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    3.2.6 shouldParkAfterFailedAcquire(Node, Node)

    此方法主要用于检查前驱节点状态, 从而决定当前线程是否可以进入 waiting 状态.

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus; // 拿到前驱节点的状态
            if (ws == Node.SIGNAL)
                // 已经设置了前驱节点释放资源后通知自己一下
                return true;
            if (ws > 0) {
                // 前驱节点已经放弃获取资源, 那就一直往前找, 直到找到最近一个正常等待的节点, 并排在它的后边.
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                 // 如果前驱节点正常, 那就把前驱的状态设置成 SIGNAL, 让它释放资源后通知自己一下.
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    3.2.7 parkAndCheckInterrupt()

    此方法就是让线程去休息, 真正进入等待状态. park()会让当前线程进入waiting状态. 在此状态下, 有两种途径可以唤醒该线程: 被unpark()或被interrupt().

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

    3.3 释放资源

    3.3.1 release(int)

    此方法是独占模式释放资源

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    3.3.2 tryRelease(int)

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }

    3.3.3 unparkSuccessor(Node)

    唤醒等待队列中的下一个线程

        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                // 从队尾开始找一直到队首, 找到队列最靠前需要唤醒的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

    参考文档:

    从ReentrantLock的实现看AQS的原理及应用

    Java并发之AQS详解

  • 相关阅读:
    Java基础教程(20)--数字和字符串
    Java基础教程(19)--Object类
    python 选择和循环结构
    购物清单
    第五次安卓作业(计算器和增删改查)
    第四次安卓作业(用户添加)
    第三次安卓作业(用户登录)
    第二次安卓作业(九宫格)
    5.22作业
    5.29作业
  • 原文地址:https://www.cnblogs.com/xxoome/p/14263666.html
Copyright © 2011-2022 走看看