zoukankan      html  css  js  c++  java
  • Java AbstractQueuedSynchronizer(AQS)

    AbstractQueuedSynchronizer 为 java.util.concurrent.locks 包下的一个抽象类,简称 AQS抽象队列同步器)。

    并发包(JUC)中的 ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask 等,底层都是基于 AQS 来实现的。

    一、使用

    1.AQS 采用了模板模式

    自定义同步器时需要重写下面几个方法。

    // 该线程是否正在独占资源(是否在独占模式下被线程占用)。只有用到 condition 才需要去实现它。
    boolean isHeldExclusively();
    
    // 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
    boolean tryAcquire(int arg);
    
    // 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
    // 成功后,等待中的其他线程此时将有机会获取到资源。
    boolean tryRelease(int arg);
    
    // 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    int tryAcquireShared(int arg);
    
    // 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
    boolean tryReleaseShared(int);

    默认每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。

    AQS 类中的其它方法都是 final 修饰,无法被其它类使用。

    2.AQS 对资源的共享方式

    一般自定义同步器,要么是独占(tryAcquire-tryRelease)方式,要么是共享(tryAcquireShared-tryReleaseShared)方式。(AQS 也支持同时实现,如 ReentrantReadWriteLock)

    • Exclusive(独占):只有一个线程能执行,如 ReentrantLock。独占又可分为公平锁和非公平锁:

      • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

      • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的

    • Share(共享):多个线程可同时执行,如 Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock。

    3.自定义同步器(独占式)

    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    
    public class MyMutex {
        private final Sync sync = new Sync();
    
        // 当前状态为 0 时获取锁,然后进行 CAS 设置同步状态
        // 未获取到当前线程则会进入同步队列等待
        public void lock() {
            sync.acquire(1);
        }
    
        // 释放锁,将状态设置为 0
        public void unlock() {
            sync.release(1);
        }
    
        // 是否处于被当前线程占有状态
        public boolean isLocked() {
            return sync.isHeldExclusively();
        }
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            @Override
            protected boolean tryAcquire(int arg) {
                // 首先判断状态是否=0,如果状态=0,就将 status 设置为 1
                if (compareAndSetState(0, 1)) {
                    // 将当前线程赋值给独占模式的 onwer
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                // 判断当前是获得资源的线程
                if (Thread.currentThread() != getExclusiveOwnerThread() || getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                // 没有线程拥有这个锁
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            @Override
            protected boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread() && getState() == 1;
            }
        }
    }
    View Code

    测试,用 30 个线程,每个线程对 i 自加 10000 次,同步正常的话,最终结果应为 300000

    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class Demo {
        private static volatile int count;
        private static MyMutex mutex = new MyMutex();
        private static CyclicBarrier barrier = new CyclicBarrier(31);
        private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(30, 30, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(30));
    
        public static void main(String[] args) throws Exception {
            // 未加锁
            for (int i = 0; i < 30; i++) {
                // 向线程池提交任务
                threadPool.execute(() -> {
                    try {
                        for (int j = 0; j < 10000; j++) {
                            count++;
                        }
                        barrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            barrier.await();
            System.out.println("未加锁,count=" + count);
    
            // 重置
            barrier.reset();
            count = 0;
    
            // 加锁
            for (int i = 0; i < 30; i++) {
                // 向线程池提交任务
                threadPool.execute(() -> {
                    try {
                        for (int j = 0; j < 10000; j++) {
                            mutex.lock();
                            count++;
                            mutex.unlock();
                        }
                        barrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            barrier.await();
            System.out.println("加锁后,count=" + count);
    
            threadPool.shutdown();
        }
    }
    View Code

    二、源码

    AQS 内部实现主要是状态变量 state 和一个 FIFO(先进先出)队列来完成(这个内置的同步队列称为 CLH 队列,其实就是个双端双向链表)。

    1.状态变量 state

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
        // The current owner of exclusive mode synchronization
        private transient Thread exclusiveOwnerThread;
    
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        // 等待队列头部节点
        private transient volatile Node head;
        // 等待队列尾部节点
        private transient volatile Node tail;
        // 状态,大于等于 1 阻塞线程。类似当前窗口已有人在办理业务,排队的其它人需要等待
        private volatile int state;
    
        // CAS 方式更新
        protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

    2.CLH 队列,由 AQS 中的一个静态内部类 Node 实现

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        static final class Node {
            static final Node SHARED = new Node(); // 标记:表示节点正在 共享 模式中等待
            static final Node EXCLUSIVE = null; // 标记:表示节点正在 独占 模式下等待
    
            static final int CANCELLED = 1; // 线程被取消
            static final int SIGNAL = -1; // 后继线程需要唤醒
            static final int CONDITION = -2; // 等待 Condition 唤醒
            static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点
            volatile int waitStatus; // 当前节点在队列中的状态,初始为 0,状态是上面的几种
    
            volatile Node prev; // 当前结点的前驱节点,当节点加入同步队列时被设置(尾部添加)
            volatile Node next; // 当前结点的后继节点
            volatile Thread thread; // 与当前结点关联的线程
    
            /**
             * 既可以作为同步队列节点使用,也可以作为 Condition 的等待队列节点使用。
             * 在作为同步队列节点使用时,nextWaiter 可能有两个值:EXCLUSIVE、SHARED 标识当前节点是独占模式还是共享模式。
             * 在作为 Condition 的等待队列节点使用时,nextWaiter 保存后继节点。
             */
            Node nextWaiter;
    
            final boolean isShared() { // 是否在共享模式下等待
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException { // 返回前驱节点
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() { // 用于建立初始队列头或 共享 标记
            }

    图示

    两者关系

    3.使用 ReentrantLock 类来看 AQS 的运行流程

    以为公平锁为例

    ReentrantLock lock = new ReentrantLock();
    
    new Thread(() -> {
        try {
            Thread.sleep(1234);
            lock.lock();
            System.out.println("AAA");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "A").start();
    
    new Thread(() -> {
        try {
            Thread.sleep(1234);
            lock.lock();
            System.out.println("BBB");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "B").start();
    
    new Thread(() -> {
        try {
            Thread.sleep(1234);
            lock.lock();
            System.out.println("CCC");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }, "C").start();
    View Code

    第一个线程获取锁(可以获取到锁)

    public class ReentrantLock implements Lock, java.io.Serializable {
        public void lock() {
            sync.lock();
        }
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            abstract void lock();
        }
    
        static final class NonfairSync extends Sync {
            final void lock() {
                if (compareAndSetState(0, 1))
                    // 会进入这里,初始状态变量默认为 0
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
        }
    
    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }

    这时重点看第二个线程获取锁(第一个线程没有释放锁,第二个线程获取不到锁),这里比较繁琐,分步来说

    public class ReentrantLock implements Lock, java.io.Serializable {
        public void lock() {
            sync.lock();
        }
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            abstract void lock();
        }
    
        static final class NonfairSync extends Sync {
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    // 会进入这里,状态变量已被第一个线程修改
                    acquire(1);
            }
        }

    继续看 acquire(1),其中会再次尝试获取锁

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        public final void acquire(int arg) {
            // 先看 tryAcquire,尝试获取锁
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    public class ReentrantLock implements Lock, java.io.Serializable { static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 第一个线程已经释放锁 if (compareAndSetState(0, acquires)) { // 第二个线程获取到锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 当前线程是否等于已获取到锁的线程 // 已获取到锁的线程又获取到锁,可重入锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); // 再次修改状态变量 return true; } return false; // 再次尝试获取锁失败 } }

    继续看尝试获取锁失败的情况,这时就会形成队列

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        public final void acquire(int arg) {
            // 尝试获取锁失败,tryAcquire 返回 false,取反为 true,会继续执行 addWaiter
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        private Node addWaiter(Node mode) {
            // 将当前线程包装为 Node 节点,以 EXCLUSIVE(独占) 模式
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            if (pred != null) { // 看队列尾节点是否不为 null,这时队列都还没又,自然为 null
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node); // 建立队列
            return node; // 返回当前线程包装而成的节点
        }
    
        private Node enq(final Node node) {
            for (; ; ) { // 自旋
                Node t = tail;
                if (t == null) { // 尾节点是否为 null,当然是
                    if (compareAndSetHead(new Node())) // 注意这里 new 了一个空 Node 为队列的头节点(哨兵节点)
                        tail = head; // 让尾节点等于头节点,这时已形成了队列(只有一个节点,还是空 Node)
                } else { // 自旋第二次进入这里,把当前节点插入队列
                    node.prev = t; // 当前节点上一个指向尾节点
                    if (compareAndSetTail(t, node)) { // 尾节点没有被改变,就把当前节点设置为尾节点
                        t.next = node; // 尾节点的写一个指向当前节点,这时就已经成功将当前节点插入到队列末尾
                        return t; // 返回之前的尾节点,结束自旋
                    }
                }
            }
        }

    这时的队列情况

    继续看加入队列之后的操作,可以猜想到肯定会阻塞线程

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        public final void acquire(int arg) {
            // addWaiter(Node.EXCLUSIVE) 返回当前线程包装而成的节点,继续执行 acquireQueued
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true; // 标记是否成功拿到锁
            try {
                boolean interrupted = false; // 标记等待过程中是否被中断过
                for (; ; ) {
                    // 获取当前 Node 的前驱 Node,为 null 会抛异常(这时队列里有两个 Node,这里获取到的显然是头节点)
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 不等于头节点或获取锁失败,这里是头节点,所以是获取锁失败
                    // 如果自己可以休息了,就通过 park() 进入 waiting 状态,直到被 unpark()。
                    // 如果在不可中断的情况下被中断了,那么会从 park() 中醒过来,发现拿不到资源,从而继续进入 park() 等待
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true; // 如果等待过程中被中断过,哪怕只有一次,就将 interrupted 标记为 true
                }
            } finally {
                if (failed)
                    // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待
                    cancelAcquire(node);
            }
        }

    //
    static final int CANCELLED = 1; // 线程被取消 // static final int SIGNAL = -1; // 后继线程需要唤醒 // static final int CONDITION = -2; // 等待 Condition 唤醒 // static final int PROPAGATE = -3; // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点状态,头节点创建时默认为 0 if (ws == Node.SIGNAL) return true; // 第二此会走这里,表示后继节点可以被阻塞了 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 第一次会进入这里,将头节点的值设为 -1 } return false; } private final boolean parkAndCheckInterrupt() { // shouldParkAfterFailedAcquire 把前驱节点状态设置好后,开始阻塞当前节点 LockSupport.park(this); return Thread.interrupted(); // 返回在被阻塞的这段时间内线程是否被中断过 }

    https://www.cnblogs.com/jhxxb/p/10864125.html

    https://www.cnblogs.com/jhxxb/p/11527735.html

    这时的队列状态,头节点已标记后继结点需要唤醒,后继节点已被阻塞

    继续再回到第一个线程执行完毕,这时需要唤醒后继节点,并进行出队操作

    public class ReentrantLock implements Lock, java.io.Serializable {
        public void unlock() {
            sync.release(1);
        }
    
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases; // 当前状态变量减一
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 若同一线程多次获取到锁,这里就不等于 0
                free = true;
                setExclusiveOwnerThread(null); // 置空
            }
            setState(c); // 设置状态变量
            return free;
        }
    
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        public final boolean release(int arg) {
            if (tryRelease(arg)) { // 当前状态变量已为 0
                Node h = head;
                if (h != null && h.waitStatus != 0) // 在后继节点阻塞时已经设置头节点状态了
                    unparkSuccessor(h); // 唤醒 h 的后继节点
                return true;
            }
            return false;
        }
    
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0); // 设置头节点状态为 0,因为要唤醒后续节点了
    
            Node s = node.next;
            if (s == null || s.waitStatus > 0) { // s 若有后继节点,s.waitStatus 会为 -1
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread); // 唤醒后继节点
        }

    已执行完的头节点进行出队操作

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
        public final void acquire(int arg) {
            // addWaiter(Node.EXCLUSIVE) 返回当前线程包装而成的节点,继续执行 acquireQueued
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true; // 标记是否成功拿到锁
            try {
                boolean interrupted = false; // 标记等待过程中是否被中断过
                for (; ; ) {
                    // 获取当前 Node 的前驱 Node,为 null 会抛异常(这时队列里有两个 Node,这里获取到的显然是头节点)
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) { // 等于头节点,再次尝试获取锁
                        // 获取锁成功
                        setHead(node); // 头节点显然已执行完毕,需要移除,然后将当前节点设置为头节点
                        p.next = null; // 让已执行完的头节点不可达,help GC
                        failed = false; // 成功获取资源
                        return interrupted; // 返回等待过程中是否被中断过
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待
                    cancelAcquire(node);
            }
        }
    
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }

    队列状态

    到这里,正常情况下的获取锁与释放锁的流程就进行完了


    https://mp.weixin.qq.com/s/PAn5oTlvVmjMepmCRdBnkQ

    https://mp.weixin.qq.com/s/uOIuvC6ZjpdQyEtrHK6Ocw

    https://www.cnblogs.com/waterystone/p/4920797.html

    https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

    https://blog.csdn.net/pange1991/article/details/80930394

    https://coderbee.net/index.php/concurrent/20131115/577/comment-page-1

    https://segmentfault.com/a/1190000015562787

    https://www.javadoop.com/post/AbstractQueuedSynchronizer

  • 相关阅读:
    AES块加密与解密
    流加密的密文解密
    Linux命令——压缩和解压缩
    Linux之Vim学习
    Linux命令——磁盘管理
    Linux命令——用户和用户组管理
    Linux命令——文件和目录管理
    C++的技术探究
    Debian系统下实现通过wpa_config连接WPA-PSK加密的Wifi连接
    如何在openWRT系统上实现双版本
  • 原文地址:https://www.cnblogs.com/jhxxb/p/11340509.html
Copyright © 2011-2022 走看看