zoukankan      html  css  js  c++  java
  • Java显式锁学习总结之三:AbstractQueuedSynchronizer的实现原理

    概述

    上一篇我们讲了AQS的使用,这一篇讲AQS的内部实现原理。

    我们前面介绍了,AQS使用一个int变量state表示同步状态,使用一个隐式的FIFO同步队列(隐式队列就是并没有声明这样一个队列,只是通过每个节点记录它的上个节点和下个节点来从逻辑上产生一个队列)来完成阻塞线程的排队。

    这里FIFO队列的节点在AQS中被定义为一个内部类Node,Node的主要字段有:

    1. waitStatus:等待状态,所有的状态见下面的表格。
    2. prev:前驱节点
    3. next:后继节点
    4. thread:当前节点代表的线程
    5. nextWaiter:Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;在作为等待队列节点使用时,nextWaiter保存后继节点。
    状态 含义
    CANCELLED 1 当前节点因为超时或中断被取消同步状态获取,该节点进入该状态不会再变化
    SIGNAL -1 标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,需要通知后继节点继续运行。每个节点在阻塞前,需要标记其前驱节点的状态为SIGNAL。
    CONDITION -2 标识当前节点是作为等待队列节点使用的。
    PROPAGATE -3  
    0 0 初始状态

    队列拥有首节点和尾节点,这两个节点分别保存于AQS的两个字段:head、tail。

    同步队列的基本结构:

    当一个线程想要获得同步状态的时候,如果当前有其他线程持有同步状态,当前线程将无法获取,转而被构造为一个Node添加到同步队列的尾部,而这个加入的过程必须保证线程安全,因此同步器提供了一个基于CAS的设置队尾的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程"认为"的队尾。在一个Node被CAS设置为队列之前,这个Node的prev已经被设置为之前的尾节点,而在这个Node被设置为队尾之后,之前尾节点的next才会被指向这个Node。因此在任一时刻,从head向后遍历队列不一定能遍历到tail,因为最后的tail可能还没有被倒数第二个节点指为next,但是从tail向head遍历一定能遍历head。记住这个结论之后会用到。

    在队列中,首节点是当前获取同步状态成功的节点,首节点在释放同步状态时,会唤醒后继节点,而后继节点会在自己获取同步状态成功时,将自己设置为首节点。因为设置首节点是由持有同步状态的线程来完成的,因此不需要使用CAS来保证线程安全,只需要持有同步状态的线程将首节点设置为原首节点的后继节点并断开原首节点的next引用即可。

    独占式同步状态获取与释放

    获取

    获取独占同步状态使用方法acquire(int arg):

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    其中tryAcquire(arg)方法是我们继承AQS的子类重写的方法,可以看到,如果tryAcquire返回了true代表获取锁成功,则acquire方法立即返回。如果tryAcquire返回了false我们知道AQS应该完成的操作是阻塞当前线程直到当前线程获取到同步状态。可以看出会依次调用方法addWaiter、acquireQueued。

    先来看addWaiter:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode); // 将当前线程构造为Node,mode传入值为Node.EXCLUSIVE,将保存在Node的nextWaiter字段,标识当前节点为独占模式
            // 如果当前尾节点不为空,尝试设置当前节点为尾节点,这块是完整设置尾节点的一个简单实现,如果这个能成功,不用调用完整设置尾节点的方法enq了,如果失败,则调用enq方法。
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
       // 完整的设置尾节点方法,如果当前节点不为空,则把当前节点设为尾节点,并将原尾节点next指向当前节点;如果当前尾节点为空,即当前同步队列为空,则新建一个傀儡节点作为首节点和尾节点,然后再将当前节点设为尾节点。
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    然后看acquireQueued:

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
       //如果当前节点的前驱是头节点,说明即将轮到自己获得同步状态,再次调用tryAcquire检查是否能获取到同步状态(这里之所以要再次检查,有两个原因:
    一是因为尽管当前节点排到首节点后面,而且已经被首节点唤醒,但是首节点在唤醒当前节点后,并不是马上释放同步状态;
    二是因为如果此时有新的线程第一次尝试获取同步状态正好赶在首节点释放同步状态,那么新的线程可能直接就不排队了直接获取到同步状态。)
    if (p == head && tryAcquire(arg)) { setHead(node); //获取到同步状态则将自己设为头节点 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;
    //如果前驱节点已经是SIGNAL状态则前驱节点执行完成后会唤醒当前节点
    if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true;
    //前驱节点状态为CANCELLED,则继续查找前驱节点的前驱节点,因为当首节点唤醒时,会跳过CANCELLED节点
    if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;
    //如果是0或PROPAGATE状态,则用CAS设置为SIGNAL }
    else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //该方法在 shouldParkAfterFailedAcquire 方法返回true后执行,shouldParkAfterFailedAcquire 方法返回true代表前驱节点已经被设置为SIGNAL状态,
    因此当前节点可以阻塞等待唤醒了,使用LockSupport.park(this)方法来阻塞。这个方法会一直阻塞直到首节点唤醒当前节点或当前节点被中断,如果是被中断,中断标识将会被一直往上层方法传,最终acquire方法会执行selfInterrupt。
    private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

    释放

    释放同步状态通过方法release(int arg):

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    tryRelease方法是我们自己在AQS子类中重写的方法,可以看到release方法的逻辑比较简单,如果tryRelease方法返回false,那么release方法直接返回false;如果tryRelease方法返回true则通过unparkSuccessor方法唤醒后继节点:

        private void unparkSuccessor(Node node) {
    //如果头节点的状态是负值,将其归0.如果失败了也ok。
    int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //通常要唤醒的节点就是头节点的直接后继节点,但是如果直接后继节点是null或状态为CANCELLED,则从tail向前遍历取离head最近的一个非CANCELLED状态的节点。这里之所以要从tail向前遍历,前面说过原因:最后的tail节点在构造的时候在某时刻可能只有其向前一个节点的prev引用,而没有前一个节点向它的next引用。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 唤醒下一个节点。这里要注意理论上可能头节点唤醒下一个节点的时候,下一个节点还没有通过park方法阻塞,而LockSupport方法在这种情况的表现是:如果先调用了unpark方法,那么之后调用park时将不会阻塞。因此在这种情况下也不会有什么问题。 }

    共享式同步状态获取与释放

    共享式同步状态获取/释放和独占式状态获取/释放大体一致,只是加了释放传播:

    我们举个例子,比如我们要定义一个类似于Semaphore的同步组件,支持n个线程可以同时获取同步状态,超过n时则阻塞,假如AQS没有给我们提供共享式的tryAcquireShared和tryReleaseShared方法,我们试图用独占式方法来实现这个组件,那么我们可能会这样重写tryAcquire和tryRelease(只贴出AQS的子类实现,其他代码略):

        private static class SemaphoreSynchronizer extends AbstractQueuedSynchronizer {
            public SemaphoreSynchronizer(int arg) {
                setState(arg);  //用state表示当前可用许可数
            }
    
            @Override
            protected boolean tryAcquire(int arg) {
                for (;;) {
                    int state = getState();
                    int newState = state - 1;   //许可数-1
    
                    //如果已经没有许可可用,则返回false
                    if (newState < 0) {
                        return false;
                    }
                 
                    //如果有许可可用而且CAS成功,则返回true,否则循环重新判断是否有许可可用
                    if (compareAndSetState(state, newState)) {
                        return true;
                    }
                }
    
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                for (;;) {
                    int current = getState();
                    int newCount = current + 1;  //释放成功则许可数+1
                    //如果释放成功返回true,否则循环重新释放
                    if (compareAndSetState(current, newCount)) {
                        return true;
                    }
                }
            }
    
            protected Condition newCondition() {
                return new ConditionObject();
            }
    
        }

    这样实现的话仔细想就会发现有问题:

    比如许可数设为3,当前正有t1、t2、t3这三个线程在运行,然后来了两个线程t4、t5被阻塞了,因为t1、t2、t3是并发运行,因此假设t1和t2同时释放许可,独占式释放同步状态代码如下:

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    这两个线程在并发条件下,Node h=head这句可能h同时指向t4,然后t4被唤醒2次,最终的结果就是t3和t4在执行,而t5在被阻塞,尽管有效许可数是3。

    为了避免这个问题,需要在共享式同步状态的释放和获取处都做一些工作。

    释放

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
        private void doReleaseShared() {
            for (;;) {
                Node h = head; //获取首节点
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {  //如果首节点的状态是SIGNAL,则CAS修改SIGNAL为0,如果成功就唤醒后继节点,失败则重新获取首节点
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&   //如果首节点状态是0,则将状态改为PROPAGATE(传播状态)
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // 这个是外面一层for循环的终止条件,外面一层循环的意义在于如果首节点在以上操作中发生了变化,那么可能有其他节点已经唤醒了之前获取的首节点的后继节点,于是当前线程要获取新的首节点的后继节点。
                    break;
            }
        }

    获取

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
           
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }

    和独占式获取相比,主要区别在于setHeadAndPropagate方法:

    一个节点在获取了同步状态后,不仅把自己设置为头节点,而且如果当前同步状态>0||原head为null||原head的状态<0||当前head为null||当前状态<0,且下一个节点的类型为null(类型未知)||下一个节点类型为shared,则继续唤醒下一个节点。

    注:节点状态<0意味着是SIGNAL或PROPAGATE。

    总结

    AQS的实现中,独占式状态获取与释放比较简单容易理解,共享式状态获取与释放比较复杂。不过就实际应用而言,相信我们也不需要了解里面的所有细节,只需要理解原理即可。

  • 相关阅读:
    Azure DevOps Server 2020.1 新增功能 (TFS)
    Azure DevOps Server 2020.1 升级指南 (TFS)
    Azure DevOps Server:如何在Git历史记录中显示中文姓名
    Azure DevOps Server:集中显示所有团队的燃尽图
    MS中adjust hydrogen功能不能使用的问题
    bat对拍
    CSP 201812-4 数据中心(最小瓶颈生成树)
    CSP 202009
    CSP 202012
    牛客练习赛76
  • 原文地址:https://www.cnblogs.com/sheeva/p/6472949.html
Copyright © 2011-2022 走看看