zoukankan      html  css  js  c++  java
  • AQS源码的简单理解

    概念

    AQS全称 AbstractQueuedSynchronizer。

    AQS是一个并发包的基础组件,用来实现各种锁,各种同步组件的。它包含了state变量、加锁线程、等待队列等并发中的核心组件。

    ReentrantLock、Semaphore、CountDownLatch、FutrueTask,这些都是基于AQS构建的。

    而AQS是基于volatile变量的读/写和CAS( 也就是compareAndSet()方法 )实现的。

    volatile可以保证并发中的可见性,还可以禁止指令重排序。CAS,用于管理对共享数据的并发访问。

    ReentrantLock和AQS的关系

    在ReentrantLock中,state代表了加锁的状态。初始状态下,这个state的值是0。

    另外,这个AQS内部还有一个关键变量,用来记录当前加锁的是哪个线程,初始化状态下,这个变量是null。

    线程A跑过来调用ReentrantLock的lock()方法尝试进行加锁,这个加锁的过程,直接就是用CAS操作将state值从0变为1。

    如果之前没人加过锁,那么state的值肯定是0,此时线程A就可以加锁成功。

    一旦线程1加锁成功了之后,就可以设置当前加锁线程是自己。

    主要流程

    重要的变量

    • state
      状态变量。
    /**
     * The synchronization state.
     */
    private volatile int state;
    

    等待队列的节点Node

    "CLH" lock queue,代表“等待锁的线程组成的队列”,以下简称线程队列。

    线程队列,通常被用来处理并发的情况,它通过双向队列(FIFO)来完成同步状态。

    每个线程都会被封装成一个Node节点放到同步队列中。

    队列的每个Node节点保存了当前线程的同步状态,等待状态,上一个节点和下一个节点等。

    static final class Node {
            //共享模式
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            //独占模式
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            // 线程的等待状态 表示线程已经被取消
            static final int CANCELLED =  1;
            // 线程的等待状态 表示后继线程需要被唤醒
            static final int SIGNAL    = -1;
            // 线程的等待状态 表示线程在Condtion上
            static final int CONDITION = -2;
            // 表示下一个acquireShared需要无条件的传播
            static final int PROPAGATE = -3;
    
            /**
             *   等待状态有以下几种:
             *   
             *   SIGNAL:     当前节点的后继节点处于等待状态时,如果当前节点的同步状态被释放或者取消,
             *               必须唤起它的后继节点
             *         
             *   CANCELLED:  一个节点由于超时或者中断需要在CLH队列中取消等待状态,被取消的节点不会再次等待
             *               
             *   CONDITION:  当前节点在等待队列中,只有当节点的状态设为0的时候该节点才会被转移到同步队列
             *               
             *   PROPAGATE:  下一次的共享模式同步状态的获取将会无条件的传播
     
             *   waitStatus的初始值时0,使用CAS来修改节点的状态
             */
            volatile int waitStatus;
    
            /**
             * Link to predecessor node that current node/thread relies on
             * for checking waitStatus. Assigned during enqueuing, and nulled
             * out (for sake of GC) only upon dequeuing.  Also, upon
             * cancellation of a predecessor, we short-circuit while
             * finding a non-cancelled one, which will always exist
             * because the head node is never cancelled: A node becomes
             * head only as a result of successful acquire. A
             * cancelled thread never succeeds in acquiring, and a thread only
             * cancels itself, not any other node.
             */
            volatile Node prev;
    
            /**
             * Link to the successor node that the current node/thread
             * unparks upon release. Assigned during enqueuing, adjusted
             * when bypassing cancelled predecessors, and nulled out (for
             * sake of GC) when dequeued.  The enq operation does not
             * assign next field of a predecessor until after attachment,
             * so seeing a null next field does not necessarily mean that
             * node is at end of queue. However, if a next field appears
             * to be null, we can scan prev's from the tail to
             * double-check.  The next field of cancelled nodes is set to
             * point to the node itself instead of null, to make life
             * easier for isOnSyncQueue.
             */
            volatile Node next;
    
            /**
             * The thread that enqueued this node.  Initialized on
             * construction and nulled out after use.
             */
            volatile Thread thread;
    
            /**
             * Link to next node waiting on condition, or the special
             * value SHARED.  Because condition queues are accessed only
             * when holding in exclusive mode, we just need a simple
             * linked queue to hold nodes while they are waiting on
             * conditions. They are then transferred to the queue to
             * re-acquire. And because conditions can only be exclusive,
             * we save a field by using special value to indicate shared
             * mode.
             */
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             *
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    acquire()

    线程在独占模式下获取state。

    子类继承AQS后,经常会用到acquire() 、 release()。

    acquire()获取状态、维护状态。

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //通过interrupt() 方法改变中断状态
            selfInterrupt();
    }
    

    release()

    线程在独占模式下释放state。

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    unparkSuccessor()

    唤醒Node的后继节点。

        /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            //通过CAS改变等待状态。
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    

    compareAndSetState()

    通过CAS设置状态变量。当state值为指定的参数expect时,将其修改为另一个指定的值。

    /**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    ConditionObject

    而另一个内部类ConditionObject实现了Condition接口,并且实现了其中的await(),signal(),signalALL()等方法。
    ConditionObject主要是为并发编程中的同步提供了等待通知的实现方式,可以在不满足某个条件的时候挂起线程等待。直到满足某个条件的时候在唤醒线程。

    参考资料:

    https://juejin.im/post/5c07e59cf265da617464a09c (大白话聊聊对AQS的理解)

    http://ifeve.com/introduce-abstractqueuedsynchronizer/

    https://blog.csdn.net/qq_30572275/article/details/80297047

  • 相关阅读:
    SQLite Java Wrapper/JDBC Driver(收集)
    JAVA 并行运行(收集)
    log4net使用方法(转载)
    WMI服务故障,VBS脚本无法运行错误
    ArcEngine中UID使用资料收集
    使用 ArcGIS Engine Runtime 制作安装包(转载)
    Eclipse安装WindowBuilder Pro(转载)
    C#操作SQL Server数据库
    自动化测试 (三) Web自动化测试原理
    HTTP协议 (六) 状态码详解
  • 原文地址:https://www.cnblogs.com/expiator/p/12052125.html
Copyright © 2011-2022 走看看