zoukankan      html  css  js  c++  java
  • 温故知新-多线程-深入剖析AQS



    摘要

    本文通过ReentrantLock来窥探AbstractQueuedSynchronizer(AQS)的实现原理,在看此文之前。你需要了解一下park、unpark的功能,请移步至上一篇《深入剖析park、unpark》;

    AbstractQueuedSynchronizer实现一把锁

    根据AbstractQueuedSynchronizer的官方文档,如果想实现一把锁的,需要继承AbstractQueuedSynchronizer,并需要重写tryAcquire、tryRelease、可选择重写isHeldExclusively提供locked state、因为支持序列化,所以需要重写readObject以便反序列化时恢复原始值、newCondition提供条件;官方提供的java代码如下(官方文档见参考连接);

    public class MyLock implements Lock, java.io.Serializable {
        private static class Sync extends AbstractQueuedSynchronizer {
          
            // Acquires the lock if state is zero
            @Override
            public boolean tryAcquire(int acquires) {
                assert acquires == 1; // Otherwise unused
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            // Releases the lock by setting state to zero
            @Override
            protected boolean tryRelease(int releases) {
                assert releases == 1; // Otherwise unused
                if (getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            // Provides a Condition
            Condition newCondition() {
                return new ConditionObject();
            }
    
            // Deserializes properly
            private void readObject(ObjectInputStream s)
                    throws IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
          
           // Reports whether in locked state
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
        }
    
        /**
         * The sync object does all the hard work. We just forward to it.
         */
        private final Sync sync = new Sync();
    
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    
    
        private static volatile Integer value = 0;
    
        public static void main(String[] args) {
    
            MyLock myLock = new MyLock();
            for (int i = 0; i < 1000; i++) {
                new Thread(()->{
                    myLock.lock();
                    value ++;
                    myLock.unlock();
                }).start();
            }
            System.out.println(value);
        }
    }
    

    上面是一个不可重入的锁,它实现了一个锁基础功能,目的是为了跟ReentrantLock的实现做对比;

    ReentrantLock

    ReentrantLock的特点

    ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。ReentrantLock跟常用的Synchronized进行比较;

    image-20200603140814563

    Synchronized的基础用法

    Synchronized的分析可以参考《深入剖析synchronized关键词》,ReentrantLock可以创建公平锁、也可以创建非公平锁,接下来看一下ReentrantLock的简单用法,非公平锁实现比较简单,今天重点是公平锁;

    public class ReentrantLockTest {
    
        public static void main(String[] args) {
            ReentrantLock reentrantLock = new ReentrantLock(true);
            reentrantLock.lock();
            try {
                log.info("lock");
            } catch (Exception e) {
                log.error(e);
            } finally {
                reentrantLock.unlock();
                log.info("unlock");
            }
        }
    }
    

    ReentrantLock与AQS的关联

    先看一下加锁方法lock

    • 非公平锁lock方法

      compareAndSetState很好理解,通过CAS加锁,如果加锁失败调用acquire;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    • 公平锁lock方法
    final void lock() {
        acquire(1);
    }
    
    • AQS框架的处理流程

    ​ 线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续,分析实现原理

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    总结:公平锁的上锁是必须判断自己是不是需要排队;而非公平锁是直接进行CAS修改计数器看能不能加锁成功;如果加锁不成功则乖乖排队(调用acquire);所以不管公平还是不公平;只要进到了AQS队列当中那么他就会排队;

    AQS架构图

    美团画的AQS的架构图,很详细,当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

    82077ccf14127a87b77cefd1ccf562d3253591

    AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

    CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

    公平和非公平的体现:!hasQueuedPredecessors()条件,意思是说当前同步队列没有前驱节点(也就是没有线程在等待)时才会去compareAndSetState(0, acquires)使用CAS修改同步状态变量。所以就实现了公平锁,根据线程发出请求的顺序获取锁。

    • 非公平锁的加锁流程
    img
    • 公平锁的加锁流程
    image-20200604172945029
    • 解锁公平锁和非公平锁逻辑一致
    image-20200604173023011

    加锁:

    • 通过ReentrantLock的加锁方法Lock进行加锁操作。
    • 会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
    • AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
    • tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
    • 流程:Lock -> acquire -> tryAcquire( or nonfairTryAcquire)

    解锁:

    • 通过ReentrantLock的解锁方法Unlock进行解锁。
    • Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
    • Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
    • 释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
    • 流程:unlock -> release -> tryRelease

    acquire获取锁

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
            selfInterrupt();
        }
    }
    

    tryAcquire

    acquire方法首先会调tryAcquire方法,需要注意的是tryAcquire的结果做取反;根据前面分析,tryAcquire会调用子类的实现,ReentrantLock有两个内部类,FairSync,NonfairSync,都继承自Sync,Sync继承AbstractQueuedSynchronizer;

    实现方式差别在是否有hasQueuedPredecessors() 的判断条件

    • 公平锁实现
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获取lock对象的上锁状态,如果锁是自由状态则=0,如果被上锁则为1,大于1表示重入  
        int c = getState();
        if (c == 0) {
          	// hasQueuedPredecessors,判断自己是否需要排队
            // 下面我会单独介绍,如果不需要排队则进行cas尝试加锁
            // 如果加锁成功则把当前线程设置为拥有锁的线程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
      	// 如果C不等于0,但是当前线程等于拥有锁的线程则表示这是一次重入,那么直接把状态+1表示重入次数+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    非公平锁

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    hasQueuedPredecessors

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    • Node

    先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。

    static final class Node {
        static final Node SHARED = new Node(); // 表示线程以共享的模式等待锁
        static final Node EXCLUSIVE = null; // 表示线程正在以独占的方式等待锁
        static final int CANCELLED =  1; // 表示线程获取锁的请求已经取消了
        static final int SIGNAL    = -1; // 表示线程已经准备好了,就等资源释放了
        static final int CONDITION = -2; // 表示节点在等待队列中,节点线程等待唤醒
        static final int PROPAGATE = -3; // 当前线程处在SHARED情况下,该字段才会使用
        volatile int waitStatus; // 当前节点在队列中的状态
        volatile Node prev; // 前驱指针
        volatile Node next; // 后继指针
        volatile Thread thread; // 表示处于该节点的线程
        Node nextWaiter; // 指向下一个处于CONDITION状态的节点
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        // 返回前驱节点,没有的话抛出npe
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {    // Used to establish initial head or SHARED marker
        }
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    再看hasQueuedPredecessors,整个方法如果最后返回false,则去加锁,如果返回true则不加锁,因为这个方法被取反操作;hasQueuedPredecessors是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回False,说明当前线程可以争取共享资源;如果返回True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

    • h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

    双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。

    • 当h != t时: 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True(这块具体见下边代码分析)。
    • 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。
    • 如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;
    • 如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

    如果这上面没有看懂,没有关系,先来分析一下构建整个队列的过程;

    • addWaiter(Node.EXCLUSIVE)
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // tail为对尾,赋值给pred
        Node pred = tail;
        // 判断pred是否为空,其实就是判断对尾是否有节点,其实只要队列被初始化了对尾肯定不为空
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    • enq
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    用一张图来分析一下,整个队列构建过程;

    image-20200604203002973

    • (1)通过Node(Thread thread, Node mode) 方法构建一个node节点(node2),此时的nextWaiter为空,线程不为空,是当前线程;

    • (2)如果队尾为空,则说明队列未建立,调用enq构建第一个虚拟节点(node1),通过compareAndSetHead方法构建一个头节点,需要注意的是该头节点thread是null,后续很多都是用线程是否为null来判读是否为第一个虚拟节点;

    • (3)将node1 cas设置为head

    • (4)将头节点赋值为tail = head

    • (5)进入下一次for循环时,会走到else分支,会将传入的node的指向头部节点的next,此时node2的prev指向node1(tail)

    • (6)将node2 cas设置为tail;

    • (7)将node2指向node1的next;

      经过上面的步骤,就构建了一个长度为2的队列;

    添加第二个队列时,走的是这段代码,流程就简单多了,代码如下

    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    

    image-20200604202859302

    再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());因为整个构建过程并不是原子操作,所以这个条件判断,现在再是不是就看明白了?

    • 当h != t时(3)步骤已经完成: 如果(s = h.next) == null 此时步骤(4)未完成,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回True
    • 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。
    • 如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;
    • 如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

    acquireQueued

    addWaiter方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排队中的线程进行“获锁”操作。总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

    下面通过代码从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:

    final boolean acquireQueued(final Node node, int arg) {
        // 标记是否成功拿到资源
        boolean failed = true;
        try {
            // 标记等待过程中是否中断过
            boolean interrupted = false;
            for (;;) {
                // 获取当前节点的前驱节点,有两种情况;1、上一个节点为头部;2上一个节点不为头部
                final Node p = node.predecessor();
                // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(头结点是虚节点)
                // 因为第一次tryAcquire判断是否需要排队,如果需要排队,那么我就入队,此处再重试一次
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功,头指针移动到当前node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
           // 成功拿到资源,准备释放
            if (failed)
                cancelAcquire(node);
        }
    }
    

    setHead

    设置当前节点为头节点,并且将node.thread为空(刚才提到判断是否为头部虚拟节点的条件就是node.thread == null。waitStatus状态并为修改,等下我们再分析;

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    

    shouldParkAfterFailedAcquire

    接下来看shouldParkAfterFailedAcquire代码,需要注意的是,每一个新创建Node的节点是被下一个排队的node设置为等待状态为SIGNAL, 这里比较难以理解为什么需要去改变上一个节点的park状态?

    每个node都有一个状态,默认为0,表示无状态,-1表示在park;当时不能自己把自己改成-1状态?因为你得确定你自己park了才是能改为-1;所以只能先park;在改状态;但是问题你自己都park了;完全释放CPU资源了,故而没有办法执行任何代码了,所以只能别人来改;故而可以看到每次都是自己的后一个节点把自己改成-1状态;

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的状态 
        int ws = pred.waitStatus;
        // 说明头结点处于唤醒状态
        if (ws == Node.SIGNAL)
            return true;
        // static final int CANCELLED =  1; // 表示线程获取锁的请求已经取消了
        // static final int SIGNAL    = -1; // 表示线程已经准备好了,就等资源释放了
        // static final int CONDITION = -2; // 表示节点在等待队列中,节点线程等待唤醒
        // static final int PROPAGATE = -3; // 当前线程处在SHARED情况下,该字段才会使用
        if (ws > 0) {
            do {
                // 把取消节点从队列中剔除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 设置前任节点等待状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    parkAndCheckInterrupt

    调用LockSupport.park挂起当前线程,自己已经park,无法再修改状态了!

    private final boolean parkAndCheckInterrupt() {
        // 调⽤用park()使线程进⼊入waiting状态
        LockSupport.park(this);
        // 如果被唤醒,查看⾃自⼰己是不不是被中断的,这⾥里里先清除⼀下标记位
        return Thread.interrupted(); 
    }
    

    shouldParkAfterFailedAcquire的整个流程还是比较清晰的,如果不清楚,可以参考美团画的流程图;

    cancelAcquire

    通过上面的分析,当failed为true时,也就意味着park结束,线程被唤醒了,for循环已经跳出,开始执行cancelAcquire,通过cancelAcquire方法,将Node的状态标记为CANCELLED;代码如下:

    private void cancelAcquire(Node node) {
        // 将无效节点过滤
        if (node == null)
            return;
        // 设置该节点不关联任何线程,也就是虚节点(上面已经提到,node.thread = null是判读是否是头节点的条件)
        node.thread = null;
        Node pred = node.prev;
        // 通过前驱节点,处理waitStatus > 0的node
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        // 把当前node的状态设置为CANCELLED,当下一个node排队结束时,自己就会被上一行代码处理掉;
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;
        // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点,更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
        if (node == tail && compareAndSetTail(node, pred)) {
            // 把自己设置为null
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            // 如果当前节点不是head的后继节点
            // 1:判断当前节点前驱节点的是否为SIGNAL
            // 2:如果不是,则把前驱节点设置为SINGAL看是否成功
            // 如果1和2中有一个为true,再判断当前节点的线程是否为null
            // 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
    

    当前的流程:

    • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。

    • 根据当前节点的位置,考虑以下三种情况:

      (1) 当前节点是尾节点。

      (2) 当前节点是Head的后继节点。

      (3) 当前节点不是Head的后继节点,也不是尾节点。

    (1)当前节点时尾节点

    image-20200607180254816

    (2)当前节点是Head的后继节点。

    这张图描述的是这段代码:unparkSuccessor

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    
    image-20200607180140169

    (3)当前节点不是Head的后继节点,也不是尾节点。

    这张图描述的是这段代码跟(2)一样;

    image-20200607180034112

    通过上面的图,你会发现所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作,原因是执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),也就是下图中代码1和代码2直接的间隙就会出现这种情况,此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。

    image-20200607180655529

    unlock解锁

    解锁时并不区分公平和不公平,因为ReentrantLock实现了锁的可重入,可以进一步的看一下时如何处理的,上代码:

    public void unlock() {
        sync.release(1);
    }
    

    release

    public final boolean release(int arg) {
        // 自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
        if (tryRelease(arg)) {
            // 获取头结点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    这里的判断条件为什么是h != null && h.waitStatus != 0

    1. h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。如果还没来得及入队,就会出现head == null 的情况。
    2. h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
    3. h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒,(还记得一个node是在shouldParkAfterFailedAcquire方法中被设置为SIGNAL = -1的吧?不记得翻看一下上面吧)

    tryRelease

    protected final boolean tryRelease(int releases) {
        // 减少可重入次数,setState(c);
        int c = getState() - releases;
        // 当前线程不是持有锁的线程,抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    unparkSuccessor

    这个方法在cancelAcquire其实也用到了,简单分析一下

    // 如果当前节点是head的后继节点,或者上述条件不满足,就唤醒当前节点的后继节点unparkSuccessor(node);

    private void unparkSuccessor(Node node) {
        // 获取结点waitStatus,CAS设置状态state=0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 获取当前节点的下一个节点
        Node s = node.next;
        // 如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 就从尾部节点开始找,到队首,找到队列第一个waitStatus<0的节点。
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果当前节点的下个节点不为空,而且状态<=0,就把当前节点unpark
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    为什么要从后往前找第一个非Cancelled的节点呢?

    原因1:addWaiter方法并非原子,构建链表结构时如下图中 1、2间隙执行unparkSuccessor,此时链表是不完整的,没办法从前往后找了;

    image-20200607184934087

    原因2:还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node;

    中断恢复

    唤醒后,会执行return Thread.interrupted();,这个函数返回的是当前执行线程的中断状态,并清除。

    private final boolean parkAndCheckInterrupt() {
    	LockSupport.park(this);
    	return Thread.interrupted();
    }
    

    acquireQueued代码,当parkAndCheckInterrupt返回True或者False的时候,interrupted的值不同,但都会执行下次循环。如果这个时候获取锁成功,就会把当前interrupted返回。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    final boolean acquireQueued(final Node node, int arg) {
    	boolean failed = true;
    	try {
    		boolean interrupted = false;
    		for (;;) {
    			final Node p = node.predecessor();
    			if (p == head && tryAcquire(arg)) {
    				setHead(node);
    				p.next = null; // help GC
    				failed = false;
    				return interrupted;
    			}
    			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    				interrupted = true;
    			}
    	} finally {
    		if (failed)
    			cancelAcquire(node);
    	}
    }
    

    如果acquireQueued为True,就会执行selfInterrupt方法。

    该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,感兴趣同学可以查阅一下。这里简单介绍一下:

    1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
    2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

    这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理,可以看下ThreadPoolExecutor源码的判断条件;

    image-20200607210022068

    其它

    AQS在JUC中有⽐比较⼴广泛的使⽤用,以下是主要使⽤用的地⽅方:

    • ReentrantLock:使⽤用AQS保存锁重复持有的次数。当⼀一个线程获取锁时, ReentrantLock记录当
      前获得锁的线程标识,⽤用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理理。
    • Semaphore:使⽤用AQS同步状态来保存信号量量的当前计数。 tryRelease会增加计数,
      acquireShared会减少计数。
    • CountDownLatch:使⽤用AQS同步状态来表示计数。计数为0时,所有的Acquire操作
      (CountDownLatch的await⽅方法)才可以通过。
    • ReentrantReadWriteLock:使⽤用AQS同步状态中的16位保存写锁持有的次数,剩下的16位⽤用于保
      存读锁的持有次数。
    • ThreadPoolExecutor: Worker利利⽤用AQS同步状态实现对独占线程变量量的设置(tryAcquire和
      tryRelease)。

    至此,通过ReentrantLock分析AQS的实现原理一家完毕,需要说明的是,此文深度参考了美团分析的ReentrantLock,是参考链接的第三个,有兴趣可以对比差异,感谢!

    参考

    JDK API 文档

    Java的LockSupport.park()实现分析

    [从ReentrantLock的实现看AQS的原理及应用

    [Thread的中断机制(interrupt)


    你的鼓励也是我创作的动力

    打赏地址

  • 相关阅读:
    MS SQL SERVER导出表结构到Excel
    Ajax.ActionLink用法
    Layer弹出层关闭后刷新父页面
    Ajax.BeginForm提示不支持live属性或方法的错误
    BootStrap带样式打印
    利用JQuery jsonp实现Ajax跨域请求 .Net 的*.handler 和 WebService,返回json数据
    Bootstrap打印问题
    EF的表左连接方法Include和Join
    vs code的local history插件
    idea debug的drop frame,set watch和设置过滤条件
  • 原文地址:https://www.cnblogs.com/yangsanchao/p/13063042.html
Copyright © 2011-2022 走看看