zoukankan      html  css  js  c++  java
  • Java中AQS基本实现原理

    一、AQS概述

    AQS全名AbstractQueuedSynchronizer,意为抽象队列同步器,JUC(java.util.concurrent包)下面的Lock和其他一些并发工具类都是基于它来实现的。AQS维护了一个volatile的state和一个CLH(FIFO)双向队列。

     二、分析

    state

    state是一个由volatile修饰的int变量,它的访问方式有三种:

    • getState()
    • setState(int newState)
    • compareAndSetState(int expect, int update)
        /**
         * 由volatile修饰的state
         */
        private volatile int state;
    
        /**
         * 基于内存可见性的读
         */
        protected final int getState() {
            return state;
        }
    
        /**
         * 基于内存可见性的写
         */
        protected final void setState(int newState) {
            state = newState;
        }
    
        /**
         * 使用CAS+volatile,基于原子性与可见性的对state进行设值
         */
        protected final boolean compareAndSetState(int expect, int update) {
            // 使用Unsafe类,调用JNI方法
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    资源获取主要有两种形式:

    • 独占式(EXCLUSIVE)

    仅有一个线程能在同一时刻获取到资源并处理,如ReentrantLock的实现。

    • 共享式(SHARED)

    多个线程可以同时获取到资源并处理,如Semaphore/CountDownLatch等。

    AQS中大部分逻辑已经被实现,集成类只需要重写state的获取(acquire)与释放(release)方法,因为在AQS中,这些方法默认定义的实现方式都是抛出不支持操作异常,所以按需实现即可。

    其中需要继承类重写的方法有:

    • tryAcquire(int arg)

    此方法是独占式的获取资源方法,成功则返回true,失败返回false。

    • tryRelease(int arg)

    此方法是独占式的释放资源方法,成功则返回true,失败返回false。

    • tryAcquireShared(int arg)

    此方法是共享式的获取资源方法,返回负数表示失败,0表示获取成功,但是没有可用资源,正数表示获取成功,且有可用资源。

    • tryReleaseShared(int arg)

    此方法是共享式的释放资源方法,如果允许唤醒后续等待线程则返回true,不允许则返回false。

    • isHeldExclusively()

    判断当前线程是否正在独享资源,是则返回true,否则返回false。

    CLH(FIFO)队列

    AQS中是通过内部类Node来维护一个CLH队列的。源码如下:

     static final class Node {
            /** 标记共享式访问 */
            static final Node SHARED = new Node();
            /** 标记独占式访问 */
            static final Node EXCLUSIVE = null;
    
            /** 字段waitStatus的值,表示当前节点已取消等待 */
            static final int CANCELLED =  1;
            /**字段waitStatus的值,表示当前节点取消或释放资源后,通知下一个节点 */
            static final int SIGNAL    = -1;
            /** 表示正在等待触发条件 */
            static final int CONDITION = -2;
            /**
             * 表示下一个共享获取应无条件传播
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     表示当前节点取消或释放资源后,通知下一个节点
             *   CANCELLED:  表示当前节点已取消等待
             *   CONDITION:  表示正在等待触发条件
             *   PROPAGATE:  表示下一个共享获取应无条件传播
             */
            volatile int waitStatus;
    
            /**
             * 前节点
             */
            volatile Node prev;
    
            /**
             * 下一个节点
             */
            volatile Node next;
    
            /**
             * 节点对应线程
             */
            volatile Thread thread;
    
            /**
             * 下一个等待的节点
             */
            Node nextWaiter;
    
            /**
             * 是否是共享式访问
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * 返回前节点
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // 共享式访问的构造函数
            }
    
            Node(Thread thread, Node mode) {     // 用于被添加等待者使用
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // 用于Condition使用
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    独占模式-获取资源

    使用AQS中的acquire(int arg)方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    该方法分为4个部分:

    • tryAcquire()

    需要自己实现的方法,如果获取到资源使用权,则返回true,反之fasle。如果获取到资源,返回true,!true为false,根据&&的短路性,则不会执行后续方法,直接跳过程序。如果未获取到资源,返回false,!false为true,则进入后续方法。

    • addWaiter()

    如果未获取到资源使用权,则首先会调用此方法。上源码:

        private Node addWaiter(Node mode) {
            // 封装当前线程和独占模式
            Node node = new Node(Thread.currentThread(), mode);
            // 获取尾部节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                // CAS设置尾部节点
                if (compareAndSetTail(pred, node)) {
                    // 将为节点的下一节点指向当前node
                    pred.next = node;
                    return node;
                }
            }
            // 如果尾结点为空或者设置尾结点失败
            enq(node);
            return node;
        }
    
     private Node enq(final Node node) {
            // 如果CAS设置未成功则死循环
            for (;;) {
                // 获得尾结点
                Node t = tail;
                // 如果尾节点为空,说明CLH队列为空,需要初始化
                if (t == null) { 
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 设置当前节点的前驱节点
                    node.prev = t;
                    // CAS设置当前节点为尾结点
                    if (compareAndSetTail(t, node)) {
                        // 设置后驱节点
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    • acquiredQueued()
        final boolean acquireQueued(final Node node, int arg) {
            // 标识资源获取是否失败
            boolean failed = true;
            try {
                // 标识线程是否中断
                boolean interrupted = false;
                for (;;) {
                    // 获得当前节点的前驱节点
                    final Node p = node.predecessor();
                    // 如果前驱节点为头结点,说明快到当前节点了,尝试获取资源
                    if (p == head && tryAcquire(arg)) {
                        // 获取资源成功
                        // 设置当前节点为头结点
                        setHead(node);
                        // 取消前驱节点(以前的头部)的后节点,方便GC回收
                        p.next = null; // help GC
                        // 标识未失败
                        failed = false;
                        // 返回中断标志
                        return interrupted;
                    }
                    // 如果当前节点的前驱节点不是头结点或获取资源失败
                    // 需要用shouldParkAfterFailedAcquire函数判断是否需要阻塞该节点持有的线程
                    // 如果需要阻塞,则执行parkAndCheckInterrupt方法,并设置被中断
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // 如果最终获取资源失败
                if (failed)
                    // 当前节点取消获取资源
                    cancelAcquire(node);
            }
        }
    
    • selfInterrupt()

    中断当前线程

        static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    

    独占模式-释放资源

    release()  释放资源并唤醒后继线程

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                // 获取头结点
                Node h = head;
                // 头结点不为空且等待状态值不为0
                if (h != null && h.waitStatus != 0)
                    // 唤醒后续等待线程
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            // 如果等待状态值小于0
            if (ws < 0)
                // 使用CAS将waitStatus设置为0
                compareAndSetWaitStatus(node, ws, 0);
    
            Node s = node.next;
            // 如果当前节点没有后继节点或者后继节点放弃竞争资源
            if (s == null || s.waitStatus > 0) {
                s = null;
                // 从队列尾部循环直到当前节点,找到最近的且等待状态值小于0的节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            // 如果找到的后继节点不为空,则唤醒其持有的线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
  • 相关阅读:
    Git原理与命令大全
    【网络安全】加解密算法最详解
    陪你阅读《区块链:从数字货币到信用社会》序一
    Splunk初识
    红帽学习记录[RHCE] ISCSI远程块储存
    DNS 域名系统与邮件服务器
    红帽学习记录[RHCE] 防火墙与网络合作
    红帽学习笔记[RHCE]网络配置与路由转发
    红帽学习笔记[RHCE]OpenLDAP 服务端与客户端配置
    红帽学习笔记[RHCSA] 第二周
  • 原文地址:https://www.cnblogs.com/yl-space/p/13403101.html
Copyright © 2011-2022 走看看