zoukankan      html  css  js  c++  java
  • Java 显示锁 之 队列同步器AQS(六)

    1、简述

    锁时用来控制多个线程访问共享资源的方式,一般情况下,一个锁能够防止多个线程同时访问共享资源。但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁。

    在Java 5.0之前,在协调对共享对象的访问时可以使用的机制只有 synchronized 和 volatile。Java 5.0 增加了一种的新的机制:ReentrantLock。ReentrantLock并不是一种替代内置加锁的方法,而是当内置锁机制不适用时,作为一种可选择的高级功能。相比内置锁,它更具有一定的灵活性,本文将一步步学习显式锁的原理和使用。

    2、为什么新增了与内置锁相似的新的加锁机制

    在大多情况下,内置锁都能很好的工作,但在功能上存在一些局限性,例如:无法中断一个正在等待获取锁的线程,或者无法再请求获取一个锁时无限地等待下去。内置锁必须在获取锁的代码块中释放,这虽然简化了编码工作,并且与异常操作实现了很好的交互,但是无法实现非阻塞结构的加锁规则。一种更灵活的加锁机制通常能提供更好的活跃性和性能。

    3、Lock 接口

    Lock接口定义了一组抽象的加锁操作。与内置锁不同的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显式的。

    public interface Lock {
        /**
         * 获得锁
         */
        void lock();
    
        /**
         * 正在获取锁的线程可被中断
         */
        void lockInterruptibly() throws InterruptedException;
    
        /**
         * 尝试非阻塞获取锁,调用该方法立即返回结果,如果能获得则返回true,否则返回false
         */
        boolean tryLock();
    
        /**
         * 在指定时间内一直尝试非阻塞获取锁,会在三种情况下返回结果:
         * 1、当前线程在指定时间内获取了锁
         * 2、当前线程在指定时间内未获取锁的情况下被中断
         * 3、在指定时间内未获取锁,返回false
         */
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
        /**
         * 释放锁
         */
        void unlock();
    
        /**
         * 生成一个新的Condition实例,与当前锁时绑定的
         */
        Condition newCondition();
    }
    

    Lock接口的标准使用形式如下。这种形式比内置锁复杂一些,必须在finally中释放锁。否则如果代码中抛出异常,那么这个锁永远都无法释放。当使用加锁时,还必须考虑在try块中抛出异常的情况,如果可能使对象处于某种不一致的状态,那么就需要更多的try-catch 或 try-finally 代码块。

    如果没有使用finally来释放Lock锁,就相当于埋下了一颗定时炸弹。当定时炸弹爆炸时,将很难追踪到发生错误的位置,因为没有记录应该释放锁的位置和时间。这就是为什么显示锁Lock不能完全替代synchronized的原因,它更加危险,因为当程序的执行控制离开被保护的代码块时,不会自动清除锁。虽然在finally块中释放锁很简单,但也有可能会忘记。

    Lock lock = new ReentrantLock();
            lock.lock();
            try {
                //操作代码
            } finally {
                lock.unlock();
            }
    

    4、队列同步器(AQS)

    队列同步器 AbstractQueuedSynchronizer (以下简称同步器【AQS】),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

    同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法( getState()、 setState( int newState) 和 compareAndSetState( int expect, int update))来进行操作,因为他们能够保证状态的改变是安全的。

    同步器(AQS)是实现锁或同步组件的关键,在锁的实现中聚合了同步器,利用同步器实现锁的语义。二者之间的关系,可以这样理解:

    锁 是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节(比如:允许两个线程并行访问)。

    同步器 面向的是锁或同步组件的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待与唤醒等的底层操作。

    锁和同步器很好的隔离了使用者和实现者所需要关注的领域

    4.1、队列同步器接口与实例

    队列同步器的设计是基于模板方法模式的,使用者需要继承同步器并重新指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

    重写同步器方法时,需要使用同步器提供的3个方法来访问或修改同步状态:

        /**
         * 获取当前同步状态
         */
        protected final int getState() {
            return state;
        }
    
        /**
         * 设置当前同步状态
         */
        protected final void setState(int newState) {
            state = newState;
        }
    
        /**
         * 使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。
         *
         * @param expect 期望中的值
         * @param update 要设置的新值
         */
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    同步器可重写的方法,包含两种,一种是获取独占锁,一种是获取共享锁,这两类方法如下:

        /**
         * 独占式获取同步状态,实现该方法需要查询当前状态,并判断同步状态是否符合预期,然后通过CAS设置同步状态
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 独占式释放同步状态,此时等待获取同步状态的线程将有机会获取同步状态
         */
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 共享式获取同步状态,返回结果大于等于0,表示获取成功,否则获取失败
         */
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 共享式释放同步状态
         */
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        /**
         * 是否被当前线程所独占
         */
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    
    

    实现自定义的同步器组件时,将会调用同步器提供的模板方法,同步器提供的模板方法分为三类:一是独占式获取与释放同步状态、二是共享式获取与释放同步状态、三是查询同步队列中等待线程的情况。自定义的同步器将使用这些提供的模板方法来实现自己的同步语义。同步器提供的模板方法如下:

        /**
         * 独占式获取同步状态
         * 如果当前线程获取同步状态成功,则该方法执行完毕,否则,当前线程将会进入
         * 同步队列等待,该方法将会调用重写的tryAcquire方法
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        /**
         * 独占式获取同步状态
         * 与acquire 相同,但是该方法响应中断,当前线程未获取到同步状态则进入同步队列;如果当前线程被中断,则该方法会抛出异常并返回
         */
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
        /**
         * 独占式获取同步状态
         * 该方法在 acquireInterruptibly(int arg) 基础上增加了超时限制,如果当前线程在超时时间内没有获取同步状态,那么将返回false,否则返回true
         */
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    
        /**
         * 独占式释放同步状态
         * 该方法会在释放同步状态后,将同步队列中第一个节点中的线程唤醒
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        /**
         * 共享式获取同步状态
         * 与独占式的区别在于,同一时刻可以有多个线程获得同步状态
         */
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
        /**
         * 共享式获取同步状态
         * 与方法acquireShared(int arg) 一样,只是该方法响应中断
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
        /**
         * 共享式获取同步状态
         * 在方法 acquireSharedInterruptibly(int arg) 基础上,增加了超时限制
         */
        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
    
        /**
         * 共享式释放同步状态
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        /**
         * 获取在同步队列上等待的线程的集合
         */
        public final Collection<Thread> getQueuedThreads() {
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node p = tail; p != null; p = p.prev) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
            return list;
        }
    
    

    为了更加深入的理解同步器的实现原理,我们下面通过一个独占锁的示例来感受一下。显而易见,独占锁是在同一时刻只能有一个线程获得锁,而其他获取锁的线程只能在同步队列中等待,只有持有锁的线程释放了,其他线程才能获得锁。

    /**
     * 自定义同步器,实现独占锁
     *
     * @author kaifeng
     * @date 2018/9/16
     */
    public class ExclusiveLockDemo implements Lock {
    
        /**
         * 静态内部类自定义同步器
         */
        private static class Sync extends AbstractQueuedSynchronizer {
            /**
             * 是否处于独占状态
             */
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            /**
             * 独占式获取锁
             */
            @Override
            public boolean tryAcquire(int acquires) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            /**
             * 独占式释放锁,将状态置为0
             */
            @Override
            protected boolean tryRelease(int releases) {
                if (getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            /**
             * 实例化一个 Condition,每个 Condition 都包含一个队列
             */
            Condition newCondition() {
                return new ConditionObject();
            }
        }
    
        private static final Sync sync = new Sync();
    
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        public boolean isLocked() {
            return sync.isHeldExclusively();
        }
    
        public boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    }
    

    ExclusiveLockDemo 中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。

    tryAcquire(int acquires) 方法中,通过CAS 设置同步状态为 1,代表获取了同步状态

    tryRelease(int releases) 方法中,将同步状态重置为0,代表释放了同步状态

    在使用自定义同步器ExclusiveLockDemo 时,不会直接与内部同步器交互,二是通过ExclusiveLockDemo 提供的方法。

    4.2、队列同步器实现原理解析

    接下来我们将从实现的角度分析同步器是如何完成线程同步的,主要说明以下几个方面:
    同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放

    4.2.1、同步队列

    同步器依靠内部的同步队列(FIFO 双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程及等待状态等信息构造成一个节点(Node)对象并将其加入同步队列中,同时会阻塞当前线程,当同步状态释放时,会把头节点中的线程唤醒,让其再次尝试获取同步状态。

    下面是Node 对象中几个重要的属性

    static final class Node {
           
            /**
             * 等待状态,包含以下几种:
             * 1. CANCELLED 值为1,在同步队列中等待的线程超时或被中断,需要充同步队列中取消继续等待
             *
             * 2. SIGNAL 值为-1,后继节点的线程处于等待状态,当前节点的线程释放或取消了同步状态,将会通知后继节点,使后继节点线程得以运行
             *
             * 3. CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其它线程对Condition调用 signal() 方法后,该节点将会从等待队列转移待同步队列中,加入到同步状态获取的竞争中
             *
             * 4. PROPAGATE 值为-3,表示下一次共享式同步状态获取将会无条件的被传下去
             * 
             * 5. 0 初始状态
             */
            volatile int waitStatus;
    
            /**
             * 前驱节点,当节点加入同步队列时被设置
             */
            volatile Node prev;
    
            /**
             * 后继节点
             */
            volatile Node next;
    
            /**
             * 获取同步状态的线程
             */
            volatile Thread thread;
    
            /**
             * 等待队列中的后继节点
             */
            Node nextWaiter;
    
        }
    

    节点对象Node 是构成同步队列的基础,同步队列拥有头节点(head)、尾节点(tail),获取同步状态失败的线程被加入同步队列,成为尾节点。同步队列的基本结构如下:

    在这里插入图片描述

    同步器将节点加入到同步队列的过程如下图:

    IT码客

    同步 队列 遵循 FIFO, 首 节点 是 获取 同步 状态 成功 的 节点, 首 节点 的 线程 在 释放 同步 状态 时, 将会 唤醒 后继 节点, 而后 继 节点 将会 在 获取 同步 状态 成功 时 将 自己 设置

    同步队列遵循FIFO,头节点是成功获取同步状态的节点,头节点的线程释放同步状态时,将会唤醒后继节点,而后继节点成功获取同步状态时将自己设置为头节点,其过程如下图:

    在这里插入图片描述

    4.2.2、独占式同步状态获取与释放

    aquire(int arg) 方法,可以获取同步状态,但是该方法不支持线程的中断,当线程获取同步状态失败,被加入同步队列后,该线程被中断,此时该线程是不会从队列中移除的。
    同步器的acquire(int arg) 方法如下:

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    该方法中涉及了四个方法的调用:

    1、tryAcquire(arg)

    首先调用自定义同步器实现的 tryAcquire( int arg) 方法,该方法保证线程安全的获取同步状态
    

    2、addWaiter(Node mode)

    如果同步状态获取失败,则构造同步节点,将当前请求锁的 Thread 构造成一个Node ,并通过 addWaiter( Node node) 方法将该节点添加到同步队列的尾部  
    

    3、acquireQueued(final Node node, int arg)

    接着调用 acquireQueued( Node node, int arg) 方法,使该节点以循环的方法获取同步状态
    

    4、selfInterrupt

    该方法由AQS实现, 用于中断当前线程
    

    注:除了获取锁的逻辑 tryAcquire(arg)由子类实现外, 其余均由AQS实现.

    4.2.2.1 节点构造及加入同步队列方法(addWaiter)

    如果执行到此方法, 说明获取锁已经失败了, 既然获取锁已经失败了, 就要将当前线程加到等待的队列中去, 因为是FIFO队列, 所以是直接加在队尾

        private Node addWaiter(Node mode) {
            //将当前线程构造成Node节点
            Node node = new Node(Thread.currentThread(), mode);
            // 如果队列不为空, 则用CAS方法将当前节点设为尾节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //将节点加入队列,执行到此处有两种情况:
            //1、队列为空  2、CAS失败
            enq(node);
            return node;
        }
        
    

    将节点插入队列,这里是个死循环, 即以自旋方式将节点插入队列, 失败后不停的尝试, 直到成功为止, 另外该方法也负责在队列为空时, 初始化队列

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                // 如果是空队列, 首先进行初始化
                // 这里队列不是在构造的时候初始化的, 而是延迟到需要用的时候再初始化, 以提升性能
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                // 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    
    4.2.2.2 同步器的 acquireQueued 方法

    在 acquireQueued( final Node node, int arg) 方法中,当前线程在死循环中尝试获取同步状态,只有前驱节点是头节点才能尝试获取同步状态,这是为什么呢?

    第一,头节点是成功获取同步状态的节点,而头结点的线程释放了同步状态后,将会唤醒后继节点,后继节点的线程被唤醒后,需要坚持自己的前驱节点是否是头节点

    第二,FIFO队列是延时初始化的, 这种条件下当前HEAD节点可能并没有持有锁

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 在当前节点的前驱就是HEAD节点时, 再次尝试获取锁
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //在获取锁失败后, 判断是否需要把当前线程挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    节点自旋获取同步状态,如图:

    在这里插入图片描述

    4.2.2.3 独占式(acquire)同步状态获取流程

    在这里插入图片描述

    4.2.2.3 独占式(release)同步状态释放

    当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使后续接待能够继续获取同步状态。通过调用同步器的 release(int arg) 方法可以释放同步状态,该方法释放同步状态之后会唤醒其后继节点。

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor( Node node) 方法 使用 LockSupport 来唤醒等待状态的线程。

    独占式获取及释放同步状态总结:

    在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程会被计入到队列中并在队列中自旋。停止自旋的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease( int arg) 方法释放同步状态,然后唤醒头结点的后继节点。

    4.2.3、共享式同步状态获取与释放

    共享式获取与独占式获取的区别在于同一时刻能否有多个线程同时获取到同步状态

    在这里插入图片描述

    4.2.3.1 共享式同步状态获取(acquireShared)

    通过调用同步器的acquireShared( int arg) 方法可以共享式的获取同步状态

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
        
         private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //如果当前节点的前驱节点为头节点,尝试获取同步状态
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        //如果结果大于等于0,表示获取同步状态成功,从自旋过程中退出
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    同步器调用 tryAcquireShared( int arg) 方法,尝试获取同步状态,该方法的返回值为int 型,当返回值大于等于0时,表示能够获取同步状态。在共享式获取的自旋过程中,成功退出自旋的条件就是tryAcquireShared( int arg)方法的返回值大于等于0

    4.2.3.2 共享式同步状态释放(releaseShared)

    与独占式获取同步状态一样,共享式获取同步状态也需要释放,通过调用 releaseShared( int arg) 方法来释放共享式同步状态

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
        
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    

    该方法释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(Semaphore),它和独占式的主要却别在于 tryReleaseShared( int arg) 方法必须确保同步状态线程安全释放,一般通过循环和 CAS来保证,因为释放同步状态的操作会同时涉及多个线程。

  • 相关阅读:
    Druid 使用 Kafka 将数据载入到 Kafka
    Druid 使用 Kafka 数据加载教程——下载和启动 Kafka
    Druid 集群方式部署 —— 启动服务
    Druid 集群方式部署 —— 端口调整
    Druid 集群方式部署 —— 配置调整
    Druid 集群方式部署 —— 配置 Zookeeper 连接
    Druid 集群方式部署 —— 元数据和深度存储
    Druid 集群方式部署 —— 从独立服务器部署上合并到集群的硬件配置
    Druid 集群方式部署 —— 选择硬件
    Druid 独立服务器方式部署文档
  • 原文地址:https://www.cnblogs.com/liukaifeng/p/10052596.html
Copyright © 2011-2022 走看看