zoukankan      html  css  js  c++  java
  • AQS独占式同步队列入队与出队

    入队

    Node

      AQS同步队列和等待队列共用同一种节点结构Node,与同步队列相关的属性如下。

    • prev 前驱结点
    • next 后继节点
    • thread 入队的线程
    • 入队节点的状态 
    1. INITIAl 0 初始状态。当一个节点刚刚被加入同步队列时的默认值
    2. SIGNAL -1 状态为-1的Node意味着该节点有一个后继节点在等待,也就是说如果Status=-1的节点释放锁后需要他的后继节点(next)。一个节点的waitStatuc被设置成-1只能有其后继节点设置,自己不能设置。
    3. CONDITION -2 应用于等待队列
    4. CANCELLED 1 waitStatus中唯一大于零,只有在响应中断情况下,一个线程被中断后,其waitSatus被设置为1,代表该节点不再等待锁,应该从队列中被剔除。

    acquire 

      模板方法,失败后构造节点、入队、自旋。需要关注的是如果if条件满足会执行selfInterrupt,这个后面分析。

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    构造节点并入队

    • 建立节点的构造方法并没有设置waitSatus,由于waitStatus是int类型的,在没有初始化的情况下就是0,所以默认新建的Node其waitStatus就是0。
    • 如果当前队列的tail不为Null,代表队列已经初始化,那么就自旋CAS加入队列。这里使用CAS来保证并发安全下的安全性,每次CAS之前都会再次取出当前的tail。
    • 如果当前队列的tail为Null,代表队列没有初始化,调用CAS创建Head。这里也用了CAS,同样为了保证并发安全性,且创建完成后队列的head=tail,而且会继续下一次循环,说明队列里有一个冗余节点(dummy head)
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
     private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    acquireQueued

      如果当前节点的头结点是head,且获得锁成功,把当前节点设置为head并返回。  

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

       如果前继不是头结点,或者前继是头结点但是获得锁失败

      首先判断是否需要park自己 (shouldParkAfterFailedAcquire)

    • 如果前继节点的waitStatus是-1,返回true
    • 如果前继节点大于零,说明前继节点已经被Cancelled,跳过该前继节点一直往前找知道找到一个waitStatus<=0的节点,直到找到一个<=0的,然后设置成-1,返回false
    • 返回true的会被park进入WAITING状态,返回false的将会再次尝试获得一次锁
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

     不响应中断情况下对中断的处理

      上面分析的是acquire而非acquireInterruptibly,即一个线程在获取锁时如果被中断是不响应的。从底层往上追溯一下AQS对中断的处理。

    • 首先一个线程被prak的线程被中断后会返回且不抛出异常,所以该线程会从parkAndCheckInterrupt方法返回,且返回为true
    • 接下来来到了acquireQueued的if判断,在if判断成立后interrupt被设置成true,默认情况下interrupt是false
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
    •  接着他会一如既往的尝试获取锁,如果失败会继续park自己。是否park自己只与前继节点的waitStatus有关,与自己的Interrupt没有任何关系。
    • 当该线程获取锁会从acquireQueued中返回,返回的值就是在acquireQueued中的第二个if里被修改的interrupt,也就是true
    • 然后回到了最初的起点,获得锁的线程会从acquireQueued中返回。因为acquireQueued为true所有会执行selfInterrupt。但是由于此时线程的状态已经是RUNNING状态,所以该interrupt并不会对该线程造成任何影响,产生的结果就像对一个处于RUNNING的线程执行interrupt一样,就是没有影响,除了改变了中断状态标志位以外。
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

       测试代码如下:主线程先获取锁,然后再开启子线程并尝试获取锁,此时打印出子线程中断标志位,然后主线程释放锁后,观察子线程是否继续执行。

        public static void main(String[] args) throws InterruptedException {
            final ReentrantLock lock = new ReentrantLock();
    
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    System.out.println("我来了");
                }
            });
    
            lock.lock();
            Thread.sleep(1000);
            thread.start();
            thread.interrupt();
            System.out.println(thread.isInterrupted());
            lock.unlock();
            
        }

       最终的结果和我的分析一样,在主线程将获得锁的情况下中断子线程,子线程没有任何反应,但子线程的中断标志位是true。并且在主线程释放锁后,子线程获得锁就可继续执行。

     响应中断情况下对中断的处理

      大体逻辑是相同的,不同的是如果一个线程在队列里被中断的话,会抛出异常并退出尝试获得锁。  

        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
       private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     支持超时、中断

    • 方法声明里抛出了一个异常,以为这个方法可以相应中断
    • 在每次获得锁失败都会检查是否还有剩余时间,如果有才会park自己,否则返回  
    private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    出队

    • 调用被重写的tryRelease方法释放锁。之所以这里加上if判断,是为了解决锁重入的情况下,必须把所持有的所有重入的锁都释放才可以唤醒后继节点
    • 判断是否需要唤醒后继节点,h.waitStatus!=0其实就是<0,即后面是否有等待的节点
    • 唤醒后继节点  
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

      

    • 清除等待标志位。只用了一个cas操作而非循环cas,也就意味着清除等待标志位是允许失败的
    • 找到下一个需要被唤醒的节点
    • 唤醒节点
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }

       在唤醒节点的时候,如果当前节点的直接后继不存在或者已经退出等待,那么会从队列里从tail节点开始再次寻找一个需要被唤醒的节点。当唤醒一个pre不是head的节点时,doAcquireInterruptibly中的if(p==head)判断不通过,进入shouldParkAfterFailedAcquire方法,在该方法中会清楚掉s节点之前的失效节点并把s的pre设置为head,由于shouldParkAfterFailedAcquire返回false会再次尝试获取锁。由于s的pre已经是head,所以此时s节点可以获取锁。

  • 相关阅读:
    李航统计学习方法(第二版)(十五):非线性支持向量机与核函数
    Git 工作区、暂存区和版本库
    HTML DOM tabIndex 属性
    HTML DOM Style whiteSpace 属性
    gitcli (Miscellaneous) – Git 中文开发手册
    generator.next (Generator) – JavaScript 中文开发手册
    平均宽度 | min-width (Flexible Box Layout) – CSS 中文开发手册
    chrome console的使用 :评估表达式
    optparse (Operating System) – Python 中文开发手册
    typedArray.findIndex (TypedArray) – JavaScript 中文开发手册
  • 原文地址:https://www.cnblogs.com/AshOfTime/p/10877666.html
Copyright © 2011-2022 走看看