zoukankan      html  css  js  c++  java
  • 线程基础知识04- 队列同步器(AbstractQueuedSynchronizer)

    参考书:《java并发编程艺术》

    参考博客:https://www.cnblogs.com/micrari/p/6937995.html

    源码分析:活在夢裡 大佬写的很详细,也是我看过的AQS源码分析最好的博客了。

    我就自己学习和理解的过程做一下记录。

    1 数据存储结构

        AQS的数据存储,是通过内部的Node对象存储Thread,并标识线程的状态;多个Node组成的双向链表结构;

    static final class Node {
    
        // 用于标记一个节点在共享模式下等待
        static final Node SHARED = new Node();
    
        // 用于标记一个节点在独占模式下等待
        static final Node EXCLUSIVE = null;
    
        // 等待状态:取消。由于再同步队列中等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化。
        static final int CANCELLED = 1;
    
        // 等待状态:通知。后继节点的线程处于等待状态,当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点。
        static final int SIGNAL = -1;
    
        // 等待状态:条件等待。存放在条件队列,调用signal()方法或signalAll()方法,会将该节点转移至同步队列中。
        static final int CONDITION = -2;
    
        // 等待状态:传播。表示下一次同步状态的获取将会无条件的被传播下去
        static final int PROPAGATE = -3;
    
        // 等待状态
        volatile int waitStatus;
    
        // 前驱节点
        volatile Node prev;
    
        // 后继节点
        volatile Node next;
    
        // 节点对应的线程
        volatile Thread thread;
    
        // 等待队列中的后继节点
        Node nextWaiter;
    
        // 当前节点是否处于共享模式等待
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        // 获取前驱节点,如果为空的话抛出空指针异常
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException();
            } else {
                return p;
            }
        }
    
        Node() {
        }
    
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    2.独占锁方法

    大致的过程:

    • 成功获取锁,执行线程任务,释放锁,唤醒后继node;

    • 没有获取到锁,放入到queue中,进行自旋。判断前驱为头节点时,如果是,设置当前节点为head节点,移除queue中。

    • 当node自旋中出现iterruptException时,节点移除queue.

    2.1 独占锁获取

    aquire方法

    • 1.tryAcquire(arg)方法获取独占锁,由AQS继承类实现

    • 2.如果获取独占锁成功则返回,否则加入queue中

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();//中断当前线程
        }
    
    
     private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);//创建独占节点
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;//获取最后的节点
            if (pred != null) {//末尾节点不为空的时候
                node.prev = pred;//设置节点的前置节点是tail节点
                if (compareAndSetTail(pred, node)) {//通过CAS将新加入的节点,设置成末尾节点
                    pred.next = node;
                    return node;
                }
            }
           //队列初始化,node设置为tail节点
            enq(node);
            return node;
        }
     
     /**
      * 作用:将当前节点加入到queue
      * 1.死循环,只有当节点添加到末尾时才退出
      * 2.通过CAS进行节点操作,类似“串行”执行 
      **/
     private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // 如果queue为空
                    if (compareAndSetHead(new Node()))//通过CAS,增加空的头节点
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {//通过CAS,结尾节点进行关联
                        t.next = node;
                        return t;
                    }
                }
            }
        }  
       
     /**
      * 作用:循环等待,当前节点获取到锁,或者当前节点中断;
      *  1.如果前驱是头节点并且获取到了锁,则设置当前节点为头节点
      *  2.如果没有获取到锁,则变更pre节点为SINGLE,当前线程进行阻塞
      **/
      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {//一致等待队列中的任务,直到当前任务为头节点
                    final Node p = node.predecessor();//当前节点的前一个节点
                    if (p == head && tryAcquire(arg)) {//前一个节点是头节点,并且获取到锁的时候
                        setHead(node);//将当前节点设置为头节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                      /**
                       * 检查前一个节点的状态,如果为Single
                       * parkAndCheckInterrupt方法把当前线程阻塞
                       **/   
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {// 当出现interruptedException的时候,不对异常处理,但是对当前线程所对应的当前节点的状态变更为Cancelled
                if (failed)
                    cancelAcquire(node);//变更当前节点状态
            }
        }
    
    
      /**
       * 作用:将node节点的前置节点,状态变成SIGNAL,如果是CANCELLED则移除
       **/ 
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)//表示这个节点锁已经释放,可以安全释放
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {//是cancelled状态只能跳过,找到node节点前面的非Cancelled状态的节点
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {//设置前驱状态为SIGNAL,可以获取锁的状态
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    /**
     * 1.阻塞当前线程
     * 2.判断当前线程是否中断
     **/
     private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    /**
     * 作用:链表中取消节点,并更改节点状态
     **/
    private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null;
    
            // Skip cancelled predecessors
            Node pred = node.prev;
            while (pred.waitStatus > 0)//遍历node前节点为非cacelled状态
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED;//变更当前节点的状态
    
            // If we are the tail, remove ourselves.
            if (node == tail && compareAndSetTail(node, pred)) {//如果是node是为节点,替换尾节点为node的pre
                compareAndSetNext(pred, predNext, null);//更改pre节点的后继节点为空
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {//pred节点不是头节点,不是Callced状态,
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                      //pred==head,唤醒后继锁操作
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    
    
    
    

    执行的逻辑大致如下图:

    acquireInterruptibly方法

    • 这个方法和acquire方法最大的区别在于,会抛出InterruptException
     //如果中断了,抛出异常
      public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
     private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
           //省略部分代码。。。。。。
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
          //省略部分代码。。。。。。
        }
    

    2.2 独占锁释放

    执行的过程:

    • 释放锁操作,成功则唤醒后继节点

    • 释放锁失败,则返回false

    /**
     * 1.tryRelease方法子类进行重写;
     * 2.释放锁成功后,唤醒后继node
     **/
    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    /**
     * 作用:唤醒后继节点
     **/ 
    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;//节点的状态变成0,后续等待的线程可以对状态进行调整
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             *   
             * 从tail查找非null和非cancelled状态的node的后继节点
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    

    3.共享锁方法

    3.1 共享锁获取

    acquireShared方法

    • 如果节点是取消状态,则直接跳过;如果不是取消状态,增加到链表中。

    • 当前节点的前驱是头节点,获取共享锁>=0,设置当前节点为头节点。

    • 当前节点是否传播(propagation>0)和下个节点是否共享来判断是否唤醒下个节点。

     /**
      * 返回-1表示没有获取到共享锁
      * 不会捕获InterruptException
      **/
     public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    
       /**
        * 和独占式锁代码类似
        */   
       private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);//在链表末尾增加一个共享节点
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        /**
                         * 当前节点的前置节点获取到锁;
                         * 把当前节点设置为头节点;
                         **/  
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    /**
                     * 判断p前置节点状态,阻塞当前线程并判断当前线程中断装填  
                     **/ 
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {//非正常结束进行移除
                if (failed)
                    cancelAcquire(node);
            }
        }
    
     //设置共享锁头节点,唤醒下一个节点
     private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            //
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    /**
     * 作用:释放头节点和唤醒下一个节点
     **/
     private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {//存在后继节点
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//头节点为SIGNAL,则变成0状态
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//如果为0状态,则变成PROPAGATE状态
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // 如果头节点发生变化了,就推出循环
                    break;
            }
        }
    
    // 如果存在后继节点,则唤醒后继节点
      private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    
    

    3.2 共享锁释放

    //判断有无获取到锁,获取到锁则唤醒后继节点
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                //这一块代码和上面增加共享节点的一样。
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    

    Condition子类

    条件队列插入

    await方法

    • 当前线程中断则报错退出

    • 创建节点增加到条件队列

    • 释放互斥锁

    • 没有在同步队列就进行阻塞

    • 检测是否发生中断,是更新完状态加入到同步队列时发生或是调用signal方法更改状态

    • 如果有中断状态,则抛异常或者中断当前线程

    /**模式意味着在退出等待时重新中断  */
    private static final int REINTERRUPT =  1;
    /** 退出等待时抛出Interrupt异常 */
    private static final int THROW_IE    = -1;
    
     public final void await() throws InterruptedException {
                if (Thread.interrupted())//当前线程异常,抛出异常
                    throw new InterruptedException();
                /**
                 * 1.最后节点lasterWaiter关联nextWaiter关联当前节点
                 * 2.将当前节点设置为lastWaiter 
                 **/         
                Node node = addConditionWaiter();
                /**
                 * 1.释放当前状态
                 * 2.如果释放成功,则调整当前节点状态为释放
                 **/     
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                /**
                 * 判断是否在同步队列,如果在同步队列则跳过,不在同步队列,就阻塞
                 **/     
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                /**
                 * 获取互斥锁中,如果中断,但是中断异常不抛出,设置成REINTERRUPT
                 **/ 
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                //当前节点的nextWaiter不为空,则清除cancelled状态的节点
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
               //THROW_IE抛出中断异常,REINTERRUPT中断当前线程
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    /**
     * 作用:增加当前线程到条件队列
     */
    private Node addConditionWaiter() {
                Node t = lastWaiter;//
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {//如果最后一个节点的waitStatus不是CONDITION,删除CANCELLED的节点,重新定位最后一个条件节点
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;//将当前节点设置为尾节点
                return node;
            }
    
    
     /**
      * 作用:判断是否在同步队列中
      **/
     final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null) 
                return false;
            if (node.next != null) // 有后继节点,一定在同步队列中
                return true;
    
            return findNodeFromTail(node);//在同步队列中查找,是否有当前节点
        }
    
    /**
     * 判断中断状态:
     * THROW_IE表示加入到同步队列中;
     * REINTERRUPT表示调用了SINGLE()方法,状态已经更新过了;
     * 0表示线程中断了
     **/ 
    private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
    /**
     * 作用:变更节点状态,节点加入同步队列进行判断
     **/
    final boolean transferAfterCancelledWait(Node node) {
            /**
             * 将CONDITION状态变成准备状态,
             * 将节点加到同步队列尾端
             */
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);
                return true;
            }
            /**
             * 调用了single()方法,状态已经被变更了,所以要等待加入到同步队列中  
             */
            while (!isOnSyncQueue(node))
                Thread.yield();//当前线程让步执行
            return false;
        } 
    
    //THROW_IE抛出中断异常,REINTERRUPT中断当前线程
     private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    

    signal方法

    • 判断当前线程独占锁的持有

    • 如果持有独占锁,则将头节点加入到同步队列中

    • 头节点的同步队列中前驱节点如果是取消状态,则放行进行锁竞争

     /**
      * isHeldExclusively方法由子类实现。用于检查持有互斥锁的线程是不是当前线程
      * 将第一个节点增加到同步队列 
      **/
     public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
     /**
      * 头节点移动到同步队列
      */
      private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)//如果只有一个节点,则lastWaiter变成空
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);//头节点已经状态已经取消则结束循环
            }
     /**
      * 作用:是将头节点增加到同步队列,如果头队列的前驱节点已经是取消状态,或无法调整为singal状态,则进行锁竞争
      */
        final boolean transferForSignal(Node node) {
            /*
             * 如果waitStatus不能改变,说明这个节点已经是Cancelled
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * 将节点增加到同步队列,判断前驱节点的waitStatus,如果是Cancelled或者CAS自旋无法把状态设置为SINGAL,
             * 放行节点线程进行锁竞争 
             */
            Node p = enq(node);//返回node节点的前驱
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    
    

    signalALL方法

    • 和signal方法相似,不同的是将所有条件队列转移到同步队列
      public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    
    
    
  • 相关阅读:
    最近有人说我欺骗消费者,今天来一波视频分享
    前端 Java Python等资源合集大放送
    dubbo源码学习(四):暴露服务的过程
    dubbo源码学习(二) : spring 自定义标签
    Dubbo多注册中心和Zookeeper服务的迁移
    线程各种状态转换分析
    java并发之同步辅助类CountDownLatch
    工作5年的Java程序员,才学会阅读源码,可悲吗?
    【阿里面试系列】Java线程的应用及挑战
    「阿里面试系列」搞懂并发编程,轻松应对80%的面试场景
  • 原文地址:https://www.cnblogs.com/perferect/p/13388045.html
Copyright © 2011-2022 走看看