zoukankan      html  css  js  c++  java
  • AQS-等待队列

      AQS的原理在于,每当有新的线程请求资源时,该线程会进入一个等待队列(Waiter Queue),只有当持有锁的线程释放资源后,该线程才能持有资源。该等待队列的实现方式是双向链表,线程会被包裹在链表节点Node中。Node即队列的节点对象,它封装了各种等待状态(典型的状态机模式),前驱和后继节点信息,以及它对应的线程。

      AQS定义两种资源共享方式:Exclusive(独占,在特定时间内,只有一个线程能够执行,如ReentrantLock)和share(共享,多个线程可以同时执行,如ReadLock、Semaphore、CountDownLatch),可见不同的实现方式征用共享资源的方式不同,由此,自定义同步器在实现时要根据需求来实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经实现好了。

    Node节点

     1 //标记节点为共享模式
     2 static final Node SHARED = new Node();
     3 //标记节点为独占模式
     4 static final Node EXCLUSIVE = null;
     5 //等待状态
     6 volatile int waitStatus;
     7 //前驱结点
     8 volatile Node prev;
     9 //后继节点
    10 volatile Node next;
    11 //线程
    12 volatile Thread thread;

    自定义同步器时主要需要实现以下几种方法: 

    • isHeldExclusively():该线程是否正在独占资源,只有用到condition时才需要去使用它。
    • tryAcquire(int):独占方式,尝试获取资源,返回boolean。
    • tryRelease(int):独占方式,尝试释放资源,返回boolean。
    • tryAcquireShared(int):共享方式,尝试获取资源,返回int。
    • tryReleaseShared(int):共享方式,尝试释放资源,返回boolean。

    等待队列节点对象Node有四种不同的状态:

    • CANCELLED(1)已取消
    • SIGNAL(-1)竞争获胜需要唤醒
    • CONDITION(-2)在condition队列中等待
    • PROPAGATE(-3)后续节点传播唤醒操作,共享模式下使用

    acquire方法执行流程

    1 public final void acquire(int arg) {
    2     if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
    3         selfInterrupt();
    4     }
    5 }

    1)      尝试获取锁tryAcquire,返回值表示当前线程是否获取锁。 

    2)      如果获取成功,那么说明当前对象已经持有锁,执行中断操作,中断操作会解除线程阻塞。

    3)      如果获取失败,那么把当前线程封装为Waiter节点,等待队列没有节点时初始化队列,有则使用compareAndSetTail()添加进Waiter队列尾端。

    4)      acquiredQueue自旋获取资源,并且返回Waiter节点持有的线程的应当具备的中断状态。

    5)      根据返回结果来确定是否需要执行线程中断操作。

     1 private Node addWaiter(Node mode) {
     2     //封装当前节点为node节点
     3     Node node = new Node(Thread.currentThread(), mode);
     4     Node pred = tail;
     5     if(pred != null) {
     6         //将node节点的前驱节点设置为tail
     7         node.prev = pred;
     8         //多线程环境下,tail可能已经被其它线程修改了,这里校验pred是否依然是为节点
     9         //如果是,那么将node设置为尾结点,原尾结点的后继节点设置为node,返回node
    10         if(compareAndSelfTail(pred, node)) {
    11             pred.next = node;
    12             return node;
    13         }
    14     }
    15     //执行到这里,说明tail为null,或者tail已经发生了变动
    16     enq(node);
    17     return node;
    18 }    
     1 private Node enq(final Node node) {
     2     //下面这个死循环用于把node节点插入到队尾,由于多线程环境下,tail节点可能
     3     //随时变动,必须不停的尝试,让下面两个操作不会被其它线程干涉。
     4     //1,node.prev必须为当前尾结点
     5     //2,node设置为新的尾结点
     6     for(;;) {
     7         Node t = tail;
     8         //tail为空,也说明head为空,此时初始化队列
     9         if(t == null) {
    10             //CAS方式初始化队头
    11             if(compareAndSetHead(new Node()))
    12                 tail = head;
    13         } else {
    14             //设置node.prev为当前尾结点
    15             node.prev = t;
    16            //多线程环境下,此时尾结点可能已经被其它访问修改了,需要CAS来进行比较
    17             //如果t依然是尾结点,那么node设置为尾结点、
    18             if(compareAndSetTail(t, node)) {
    19                 t.next = node;
    20                 return t;
    21             }
    22         }
    23     }
    24 }

      acquiredQueued(Node)方法会接收addWaiter封装好的Node对象,该方法的本质在于以自旋的方式获取资源,即自旋锁。它做了两件事,如果指定节点的前驱节点时头结点,那么再次尝试获取锁,反之,尝试阻塞当前线程。自旋不能构成死循环,否则会浪费大量CPU资源,在AQS中如果p==head&&tryAcquire(arg)条件不足时不会一直循环下去。通常,在p==head之前,必然会有一个线程得到锁,此时tryAcquire()通过,循环结束。如果发生了极端情况,那么node.predecessor()也会在node==head的情况下抛出空指针异常,循环结束。shouldParkAfterFailedAcquire(p,node)检测前驱节点的等待状态,需要阻塞则调用partAndCheckInterrupt()方法会阻塞当前线程,该循环也不会无限制的消耗资源。

     1 final boolean acquireQueued(final Node, int arg) {
     2     boolean failed = true;
     3     try {
     4         boolean interrupted = false;
     5         for(;;) {
     6             //找到node的前驱节点,如果node已经为head,那么会抛出空指针异常
     7             //空指针异常说明整个等待队列都没有能够获取锁的线程。
     8             final Node p = node.predecessor();
     9             //前驱节点为头结点时,当前线程尝试获取锁
    10             //如果获取成功,那么node会成为新的头结点,这个过程会清空node的线程信息。
    11             if(p == head && tryAcquire(arg)) {
    12                 setHead(node);
    13                p.next = null;
    14                 failed = false;
    15                 return interrupted;
    16             }
    17             //当前线程不能获取锁,则说明该节点需要阻塞
    18             //shouldParkAfterFailedAcquire()用于检查和设置节点阻塞状态
    19             //如果为通过检查,那么说明没有阻塞,parkAndCheckInterrupt()用于阻塞当前线程。
    20             if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    21                interrupted = true;
    22             }
    23             finally {
    24                 if(failed) cancelAcquire(node);
    25             }
    26     }
    27 }                    
     1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2     //前驱节点的等待状态
     3     int ws = pred.waitStatus;
     4     if(ws == Node.SIGNAL) {
     5         //SIGNAL表示前驱节点需要被唤醒,此时node是一定可以安全阻塞的,所以返回true
     6         return true;
     7     }
     8     if(ws > 0) {
     9         //大于0的等待状态只有CANCELLED,从队列里移除所有前置的CANCELLED节点。
    10         do {
    11             node.prev = pred = pred.prev;
    12         } while (pred.waitStatus > 0);
    13         pred.next = node;
    14     } else {
    15         //运行到这里,说明前驱节点处于0、CONDITION或者PROPAGATE状态下
    16         //此时该节点需要被置为SIGNAL状态,等待被唤醒。
    17         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    18     }
    19 }
    1 private final boolean parkAndCheckInterrupt() {
    2     //LockSupport.park()用于阻塞当前线程
    3     LockSupport.park(this);
    4     return Thread.interruupted();
    5 }

      由此可以得出结论,当一个新的线程节点入队之后,会检查它的前驱节点,只要有一个节点的状态是SIGNAL,就表示当前节点之前的节点正在被等待唤醒,那么当前线程就需要被阻塞,以等待RentrantLock.unlock()唤醒之前的线程。 

     

      在过程2中,node1刚刚入队,没有争抢到锁,此时head状态为初始化的0状态,于是调用了compareAndSetWaitStatus(pred,ws,Node.SIGNAL),这个方法会把head的状态改为SIGNAL。

      在过程3中,acquired()方法里的for循环会在执行一次,此时,node1的前驱节点依然是head,如果它依然没有竞争锁,那么由于head的waitStatus属性的值为SIGNAL,这会导致shouldParkAfterFailedAcquire()方法返回true,当前线程(node1持有的线程)被阻塞,代码不在继续往下执行。这样就达到了让等待队列里的线程阻塞的目的,由此可以类推更多线程入队的过程。由此可以类推更多线程入队的过程:

     

           SIGNAL状态由release()方法进行修改,这个方法首先调用tryRelease()方法尝试释放锁,它返回的是锁是否处于可用状态,如果锁可用,那么该方法也不负责中断等待线程的阻塞,它仅仅把锁的线程持有者设为null;然后,如果成功的释放锁,那么判断队头状态,队头为空则说明队列没有等待线程,不再做其它操作,反之再判断队头的状态waitStatus,只要它不为0,就说明等待队列中有被阻塞的节点。

     1 public final boolean release(int arg) {
     2     if(tryRelease(arg)) {
     3         Node h = head;
     4         if(h != null && h.waitStatus != 0) {
     5             unparkSuccessor(h);
     6         }
     7         return true;
     8     }
     9     return false;
    10 }
     1 private void unparkSuccessor(Node node) {
     2     int ws = node.waitStatus;
     3     //小于0的状态waitStatus只有SIGNAL和CONDITION
     4     if(ws < 0) {
     5         compareAndSetWaitStatus(node, ws, 0);
     6     }
     7     Node s = node.next;
     8     //前驱查找需要唤醒的节点
     9     if(s == null || s.waitStatus > 0) {
    10        s = null;
    11         for(Node t = tail; t != null && t != node; t = t.prev) {
    12             if(t.waitStatus <= 0) s = t;
    13         }
    14     }
    15     if(s != null) {
    16         LockSupport.unpark(s.thread);
    17     }
    18 }

           unparkSuccessor()负责确保中断正确的线程阻塞。在ReentrantLock.unlock()的调用过程中,unparkSuccessor(Node node)的形参node始终为head节点,这个方法执行的主要操作为:

    • 首先把head节点的waitStatus设置为0,表示队列里没有需要中断阻塞的线程。
    • 然后确定需要被唤醒的节点,该节点是队列中第一个waitStatus小于等于0的节点。
    • 最后,调用LockSupport.unlock()方法中断指定线程的阻塞状态。

     

           需要注意的是,node1对应的线程此时已经中断了阻塞,它会开始继续执行AQS的AacquireQueued()方法中for循环的代码final Node p = node.predecessor();显然node1的前驱节点head由于锁已经被释放,队列变化为

     

           这部分代码比较巧妙,可以注意到,在释放的过程中,代码里并没有改变head的waitStatus为SIGNAL,而是直接使用node1替代了原先的head。换言之,原本需要修改head/node2的前驱和后置,并且把head的waitStatus修改为SIGNAL,使用当前的代码,只需要释放node1的持有线程,然后移除head节点,这样可以更快的到达队列规整的目的。

    AQS如何阻塞线程和中断阻塞

           在acquired()方法中,当前线程尝试获取锁,如果没有获得,那么会把线程加入等待队列中,加入到队列的线程会被阻塞。

           线程阻塞有三种常见的实现方式:Object.wait()、Thread.join()、或者Thread.sleep()。

           中断阻塞则通过Thread.interrupt()方法来实现,这个方法会发出一个中断信号量从而导致线程抛出中断异常InterruptedException,已达到结束阻塞的目的。需要注意的是Interrupt不会中断用户循环体造成阻塞,它仅仅是抛出信号量,具体处理方式还是由用户处理。Thread.isInterrupted可以得到中断状态。

           对于wait、sleep、join等会造成线程阻塞的方法,由于它们都会抛出Interrupted Exception,处理方式如下

    1 try {
    2     Thread.currentThread().sleep(500);
    3 } catch (InterruptedException e) {
    4     //中断后抛出异常,在异常捕获里可以对中断定制处理
    5 }

            对循环体处理方式如下表示:

    1 //使用Thread.isInterrupted方法获取中断信号量
    2 while(!Thread.currentThread().isInterrupted && 用户自定义条件) {
    3 }
  • 相关阅读:
    Linux系统下的安装jdk和tomcat教程
    CentOS环境下安装jdk和tomcat
    Java的一个高性能快速深拷贝方法。Cloneable?
    AOP面向切面
    struts中实现ajax的配置信息
    上传下载固定配置
    mysql常用命令
    阿里云部署前后台项目
    PMP相关文件梳理
    面试思路总结
  • 原文地址:https://www.cnblogs.com/guanghe/p/13462079.html
Copyright © 2011-2022 走看看