zoukankan      html  css  js  c++  java
  • java并发:AQS

    AQS是一个FIFO的双向队列,其内部通过head和tail记录队首和队尾元素,队列元素的类型为 Node。

    Node

    Node 中的 thread变量用来存放进入 AQS 队列的线程;

    Node 中的 SHARED 用来标记该线程是获取共享资源时被阻塞挂起后放入 AQS 队列的;EXCLUSIVE 用来标记该线程是获取独占资源时被阻塞挂起后放入 AQS 队列的;

    waitStatus 记录当前线程等待状态,可以为:CANCELLED (线程被取消了)、SIGNAL (线程需要被唤醒)、CONDITION (线程在条件队列里面等待〉、PROPAGATE (释放共享资源时需要通知其他节点);

    prev 记录当前节点的前驱节点;next 记录当前节点的后继节点

    ConditionObject

    AQS 的内部类 ConditionObject 结合锁实现线程同步。

     

    ConditionObject 可以直接访问 AQS对象内部的变量,比如 state 和 AQS 队列。

    ConditionObject 是条件变量,每个条件变量在内部维护了一个条件队列 (单向链表队列),这个条件队列和 AQS 队列不是一回事。

    此处的队列是用来存放调用条件变量的 await 方法后被阻塞的线程,队列的头、尾元素分别为 firstWaiter 和 lastWaiter。

    Note:

    调用条件变量的 await()方法就相当于调用共享变量的 wait()方法,调用条件变量的 signal方法就相当于调用共享变量的 notify()方法,调用条件变量的 signa!All ( )方法就相当于调用共享变量的 notifyAll()方法。

    至此,相信大家都已经知道条件变量是什么了,它能用来做什么。 

    示例

    示例中(2)处使用Lock 对象的 newCondition ()方法创建了一个 ConditionObject 变量,该变量就是 Lock锁对应的一个条件变量。

    示例中(3)处获取了独占锁,示例中(4)处则调用了条件变量的 await ()方法阻塞挂起了当前线程。

    当其他线程调用条件变量的 signal方法时,被阻塞的线程才会从 await处返回。

    Note:

    在调用条件变量的 signal 和 await方法前必须先获取条件变量对应的锁,如果在没有获取到锁之前调用了条件变量的 await方法则会抛出 java.lang.IllegalMonitorStateException异常。

    一个 Lock对象可以创建多个条件变量。

    AQS 只提供了 ConditionObject 的实现,并没有提供 newCondition 函数,该函数用来 new 一个 ConditionObject对象;需要由 AQS 的子类来实现 newCondition 函数。 

    小结:

    下图反映了前面描述的关系:

    详解:

    当多个线程同时调用 lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为 Node 节点插入到 lock 锁对应的 AQS 阻塞队列里面,并做自旋 CAS 尝试获取锁。

    如果获取到锁的线程调用了对应条件变量的 await()方法,则该线程会释放获取到的锁,并被转换为 Node 节点插入到条件变量对应的条件队列里面。

    此时因调用 lock.lock() 方法被阻塞到 AQS 队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的 await ()方法则该线程也会被放入条件变量的条件队列里面。

    当另外一个线程调用条件变量的 signal()或者 signa!All()方法时,会把条件队列里面的一个或者全部 Node节点移动到 AQS 的阻塞队列里面,等待时机获取锁。

    state

    在 AQS 中维持了一个状态值 state,可以通过 getState、setState、compareAndSetState 函数修改其值,代码如下:

        /**
         * The synchronization state.
         */
        private volatile int state;
    
        /**
         * Returns the current value of synchronization state.
         * This operation has memory semantics of a {@code volatile} read.
         * @return current state value
         */
        protected final int getState() {
            return state;
        }
    
        /**
         * Sets the value of synchronization state.
         * This operation has memory semantics of a {@code volatile} write.
         * @param newState the new state value
         */
        protected final void setState(int newState) {
            state = newState;
        }
    
        /**
         * Atomically sets synchronization state to the given updated
         * value if the current state value equals the expected value.
         * This operation has memory semantics of a {@code volatile} read
         * and write.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that the actual
         *         value was not equal to the expected value.
         */
        protected final boolean compareAndSetState(int expect, int update) {
            return STATE.compareAndSet(this, expect, update);
        }

    对于 AQS 来说,线程同步的关键是对 state 进行操作。

    根据 state 是否属于一个线程,操作 state 的方式分为独占方式和共享方式。

    独占方式

    使用独占方式获取的资源是与具体线程绑定的,也就是说如果一个线程获取到了资源,则进行标记,其他线程尝试操作 state 获取资源时会发现当前该资源的持有者不是自己,于是在获取失败后被阻塞。

    例子:独占锁 ReentrantLock 的实现

    当一个线程获取了Reer rantLock的锁后,在 AQS 内部会使用 CAS 操作把状态值 state 从0 变为 1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则把状态值从 l 变为 2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己不是该锁的持有者就会被放入 AQS 阻塞队列后挂起。

    在独占方式下获取和释放资源的方法为:

        /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        /**
         * Acquires in exclusive mode, aborting if interrupted.
         * Implemented by first checking interrupt status, then invoking
         * at least once {@link #tryAcquire}, returning on
         * success.  Otherwise the thread is queued, possibly repeatedly
         * blocking and unblocking, invoking {@link #tryAcquire}
         * until success or the thread is interrupted.  This method can be
         * used to implement method {@link Lock#lockInterruptibly}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
        /**
         * Releases in exclusive mode.  Implemented by unblocking one or
         * more threads if {@link #tryRelease} returns true.
         * This method can be used to implement method {@link Lock#unlock}.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryRelease} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @return the value returned from {@link #tryRelease}
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    共享方式

    使用共享方式获取资源与具体线程不相关;当多个线程去请求资源时通过 CAS 方式竞争。

    当一个线程获取到资源后,另一个线程尝试获取资源时,如果当前资源能满足它的需要,则当前线程只需要使用 CAS 方式进行获取即可。

    例子:Semaphore 信号量

    当一个线程通过 acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋 CAS 获取信号量。

    在共享方式下获取和释放资源的方法为:

        /**
         * Acquires in shared mode, ignoring interrupts.  Implemented by
         * first invoking at least once {@link #tryAcquireShared},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquireShared} until success.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquireShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         */
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
        /**
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * @param arg the acquire argument.
         * This value is conveyed to {@link #tryAcquireShared} but is
         * otherwise uninterpreted and can represent anything
         * you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
        /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }

    Note:

    AQS是锁和同步容器的基础框架,AQS并没有提供可用的tryAcquire和tryRelease方法。

    tryAcquire和 tryRelease需要由具体的子类来实现。

    子类在实现 tryAcquire和 tryRelease时要根据具体场景使用 CAS算法尝试修改 state状态值, 成功则返回 true,否则返回 false。

    子类还需要定义在调用 acquire 和 release 方法时状态值 state 的增减代表什么含义。 

    例子:

    继承自 AQS 实现的独占锁 ReentrantLock,定义当 status 为 0 时表示锁空闲,为 1 时表示锁己经被占用。

    故其在重写 tryAcquire 时,需要使用 CAS 算法查看当前 state 是否为 0,如果为 0 则使用 CAS 设置为 1,并设置当前锁的持有者为当前线程,而后返回true;如果 CAS 失败则返回 false。

    同理,tryAcquireShared 和 tryReleaseShared 也需要由具体的子类来实现。

    例子:

    继承自 AQS 实现的读写锁 ReentrantReadWriteLock,读锁在重写 tryAcquireShared 时,首先查看写锁是否被其他线程持有,如果是则直接返回 false; 否则使用 CAS 递增 state 的高 16 位。

    (在 ReentrantReadWriteLock 中, state 的高 16 位为获取读锁的次数) 

     

    基于 AQS 实现的锁除了需要重写上面介绍的方法外,还需要重写 isHeldExclusively 方法,来判断锁是被当前线程独占还是被共享。


    问题:带有 Interruptibly关键字的函数和不带该关键字的函数有什么区别? 

    不带 Intenuptibly 关键字的方法意思是不对中断进行响应。

    详解:线程在调用不带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,则该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。

    带 Interruptibly关键字的方法要对中断进行响应。

    详解:线程在调用带 Interruptibly 关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,则该线程抛出 InterruptedException 异常而返回。

  • 相关阅读:
    练习jQuery
    Highcharts的应用步骤
    CSS中的数量查询
    何时使用 Em 与 Rem
    不错的教学网站
    HTML5中新增的语义化标签,及在IE5.5~9(IE9已经开始支持部分HTML5新标签了)支持这些新标签的兼容性处理。
    【洛谷P4139】上帝与集合的正确用法
    【洛谷P1357】花园
    【洛谷P1939】矩阵加速(数列)
    【洛谷P1962】斐波那契数列
  • 原文地址:https://www.cnblogs.com/studyLog-share/p/15127560.html
Copyright © 2011-2022 走看看