zoukankan      html  css  js  c++  java
  • Java并发编程之AQS

    一、什么是AQS

      AQS(AbstractQueuedSynchronize:队列同步器)是用来构建锁或者其他同步组件的基础框架,很多同步类都是在它的基础上实现的,比如常用的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。

    二、实现原理

      在AQS内部,定义了一个 volatile int state 变量来标识同步状态,通过改变state的状态来控制对共享资源的访问,根据不同的实现,state可以表示不同的状态,例如:在 ReentrantLock 中它表示拥有该锁的线程请求了多少次该锁;在 Semaphore 中表示剩余的许可数,在 FutureTask 中表示任务的状态(尚未开始、运行、完成和取消)。同时定义了一个 FIFO 队列维护争用资源时被阻塞的线程,当线程尝试获取锁时,如果锁已经被占用,那么该线程就会被构造成一个Node节点放到同步队列的尾部;队列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。

      AQS主要通过继承的方式来使用,子类通过继承AQS并实现它的抽象方法来定义state变量的具体的访问规则,从而可以实现不同类型的同步组件。AQS定义了两种资源共享的方式:独占式和共享式。

    1. 独占式:同时只有一个线程能访问该共享资源。
    2. 共享式:多个线程可以同时访问该共享资源。

      AQS中可重写的方法如下:

    1. protected boolean tryAcquire(int arg):独占式获取同步状态,成功则返回true,失败则返回false。先查询同步状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态。
    2. protected boolean tryRelease(int arg):独占式释放同步状态,成功则返回true,失败则返回false。等待获取同步状态的线程将有机会获取同步状态。
    3. protected int tryAcquireShared(int arg):共享式获取同步状态,返回大于等于0的值,表示成功,该值表示剩余可用资源数,小于0则表示获取失败。
    4. protected boolean tryReleaseShared(int arg):共享式释放同步状态,如果释放后允许唤醒后续等待结点返回true,否则返回false。
    5. protected boolean isHeldExclusively():当前同步器是否在独占模式下被线程占用,只在 AbstractQueuedSynchronizer.ConditionObject 的方法内进行内部调用,不使用Condition可以不重写。

      AQS自身没有实现任何同步接口,为了保证对state的访问修改操作是安全的,重写AQS指定的方法时,需要使用它提供的如下3个方法来访问或修改同步状态:

    1. getState():获取当前同步状态。
    2. setState(int newState):设置当前同步状态。
    3. compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。

      AQS是实现锁(同步组件)的关键,它在底层对同步状态管理、线程的排队、等待与唤醒做了实现,简化锁的实现。AQS是基于模板方法模式设计的,使用时需要继承AQS并重写对应的方法,再将其组合在同步组件中,当使用同步组件访问共享资源时,调用AQS提供的模板方法,然后模板方法会调用重写的方法。AQS提供的模板方法大体上分为三类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

    • 独占式获取与释放同步状态
      • public final void acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则返回,否则进入同步队列等待,该方法会调用tryAcquire(int arg)方法。
      • public final void acquireInterruptibly(int arg):与 acquire(int arg) 相同,但是该方法响应中断,如果当前线程没有获取到同步状态,就进入到同步队列中,如果当前线程被中断(Thread().interrupt()),该方法会抛出InterruptedException并返回。
      • public final boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly(int arg)的基础上增加了超时限制,如果在超时时间范围内没有获取到同步状态,就返回false,获取到了返回true。
      • public final boolean release(int arg):独占式的释放同步状态,释放成功后,会释放同步队列第一个节点中的线程。

        以上四个方法,获取同步状态的三个方法会调用重写的tryAcquire(int arg),release(int arg)会调用tryRelease(int arg)。

    • 共享式获取与释放同步状态
      • public final void acquireShared(int arg):共享式的获取同步状态,如果当前线程未获取到同步状态,进入同步队列等待,与独占式获取的主要区别是可以有多个线程同时获取到同步状态。

      • public final void acquireSharedInterruptibly(int arg):与acquireShared(int arg)相同,可响应中断。

      • public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly(int arg)的基础上增加了超时限制。

      • public final boolean releaseShared(int arg):共享式的释放同步状态。

        以上四个方法,获取同步状态的三个方法会调用重写的tryAcquireShared(int arg),release(int arg)会调用tryReleaseShared(int arg)。

    • 查询同步队列中的等待线程情况
      • public final Collection<Thread> getQueuedThreads():获取等待在同步队列上的线程集合。

      • public final boolean hasQueuedThreads():查询是否有正在等待获取的任何线程,如果可能有其他线程正在等待获取锁定,则返回 true。注意,随时可能因为中断和超时而导致取消操作,返回 true 并不能保证其他任何线程在等待获取对象。

     三、源码实现

      主要通过查看独占式同步状态的获取和释放(acquire(int arg)、release(int arg))、共享式同步状态的获取和释放(acquireShared(int arg)、releaseShared(int arg))这几个方法的源码分析AQS的实现。

    1. acquire(int arg):此方法是独占模式下线程获取共享资源的顶层入口。
      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }

      先来看下acquire方法中大致的工作流程:

      1. tryAcquire(arg):自定义同步器重写的获取同步状态的方法,如果成功,就直接返回,否则继续执行。
      2. addWaiter(Node.EXCLUSIVE):将当前线程封装成Node,并设置Node为独占模式,然后添加到同步队列的尾部。
        private Node addWaiter(Node mode) {
                Node node = new Node(Thread.currentThread(), mode);
                // Try the fast path of enq; backup to full enq on failure
                Node pred = tail;
                if (pred != null) {
                    node.prev = pred;
                    if (compareAndSetTail(pred, node)) {
                        pred.next = node;
                        return node;
                    }
                }
                enq(node);
                return node;
        }
        addWaiter(Node mode)
      3. acquireQueued(Node,arg):使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
        final boolean acquireQueued(final Node node, int arg) {
                boolean failed = true;
                try {
                    boolean interrupted = false;
                    for (;;) {
                        final Node p = node.predecessor();
                        if (p == head && tryAcquire(arg)) {
                            setHead(node);
                            p.next = null; // help GC
                            failed = false;
                            return interrupted;
                        }
                        if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                            interrupted = true;
                    }
                } finally {
                    if (failed)
                        cancelAcquire(node);
                }
        }
        acquireQueued(final Node node, int arg)
      4. selfInterrupt():根据acquireQueued()的结果,如果线程被中断过,则会调用selfInterrupt()做中断操作,否则不调用该方法。
        static void selfInterrupt() {
                Thread.currentThread().interrupt();
        }
        selfInterrupt()
    2. release(int arg):此方法是独占模式下线程释放共享资源的顶层入口。
      public final boolean release(int arg) {
              if (tryRelease(arg)) {
                  Node h = head;
                  if (h != null && h.waitStatus != 0)
                      unparkSuccessor(h);
                  return true;
              }
              return false;
      }

      释放共享资源的操作相对会简单一点,首先调用重写的tryRelease(arg),释放成功(state = 0),会获取同步队列的头结点,如果头结点不为空并且waitStatus != 0(0代表初始化状态),则调用unparkSuccessor(h)唤醒该节点中的线程并返回true。释放失败直接返回false。

      private void unparkSuccessor(Node node) {
              /*
               * If status is negative (i.e., possibly needing signal) try
               * to clear in anticipation of signalling.  It is OK if this
               * fails or if status is changed by waiting thread.
               */
              int ws = node.waitStatus;
              if (ws < 0)
                  compareAndSetWaitStatus(node, ws, 0);
      
              /*
               * Thread to unpark is held in successor, which is normally
               * just the next node.  But if cancelled or apparently null,
               * traverse backwards from tail to find the actual
               * non-cancelled successor.
               */
              Node s = node.next;
              if (s == null || s.waitStatus > 0) {
                  s = null;
                  for (Node t = tail; t != null && t != node; t = t.prev)
                      if (t.waitStatus <= 0)
                          s = t;
              }
              if (s != null)
                  LockSupport.unpark(s.thread);
      }
      unparkSuccessor(Node node)

        注:waitStatus 表示当前Node结点中线程的等待状态,共有5种状态INITIAL、CANCELLED、SIGNAL、CONDITION、PROPAGATE。初始化时为0,大于0表示取消状态,小于0表示有效状态。

      • INITIAL:值为0,表示初始状态。
      • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。

      • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。

      • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

      • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。

      3. acquireShared(int arg):此方法是共享模式下线程获取共享资源的顶层入口。

    public final void acquireShared(int arg) {
         if (tryAcquireShared(arg) < 0)
              doAcquireShared(arg);
    }
    1. tryAcquireShared(arg):先调用重写的获取同步状态的方法,如果获取成功,直接返回,获取失败则继续执行。返回值大于等于0表示获取成功,返回值表示剩余可用资源的个数;反之表示获取失败。
      • 这里有一点需要注意:假设当前线程获取资源时发现可用资源不够,当前线程会继续阻塞等待其他线程释放资源,而不会唤醒后面的线程。
    2. doAcquireShared(arg):将线程加入同步队列并设置nextWait为SHARED常量,表示当前节点是共享的,然后在队列中获取资源,直到获取到资源后才返回。
      private void doAcquireShared(int arg) {
              final Node node = addWaiter(Node.SHARED);//添加到同步队列尾部
              boolean failed = true;//失败标志
              try {
                  boolean interrupted = false;//中断标志
                  for (;;) {
                      final Node p = node.predecessor();//获取上一个节点
                      if (p == head) {//上一个节点是头节点才会尝试获取资源
                          int r = tryAcquireShared(arg);//尝试获取资源
                          if (r >= 0) {//获取成功
                              setHeadAndPropagate(node, r);//设置为头节点,如果还有剩余资源则继续唤醒后面节点中的线程
                              p.next = null; // help GC
                              if (interrupted)//如果过程中有发生过中断,则进行中断操作
                                  selfInterrupt();
                              failed = false;
                              return;
                          }
                      }
                      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())//shouldParkAfterFailedAcquire()判断是否需要阻塞当前线程,如果该方法返回true,则调用parkAndCheckInterrupt()方法来阻塞线程,阻塞后返回当前线程的中断标志,如果为true,则将interrupted 改为true。
                          interrupted = true;
                  }
              } finally {
                  if (failed)//当前线程是否已经中断,如果中断,failed为true
                      cancelAcquire(node);//将线程从同步队列中移除,并唤醒下一个节点。
              }
      }
      doAcquireShared(int arg)

    然后看一下setHeadAndPropagate(Node node, int propagate)、shouldParkAfterFailedAcquire(Node pred, Node node)和parkAndCheckInterrupt()这几个方法:

    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);//设置为头节点
            //如果还有剩余资源则唤醒下一个线程
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
    } 
    setHeadAndPropagate(Node node, int propagate)
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)//如果前驱节点为SIGNAL,后继节点为等待状态 - 阻塞
                return true;
            if (ws > 0) {//CANCELLED,如果前驱节点已经被中断或取消,则跳过所有状态为Node.CANCELLED的节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//CAS设置状态为Node.SINGAL
            }
            return false;
    }
    shouldParkAfterFailedAcquire(Node pred, Node node)
    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);//阻塞当前线程
            return Thread.interrupted();//返回中断状态并设置为false
    }
    parkAndCheckInterrupt()

      4. releaseShared(int arg):此方法是共享模式下线程释放共享资源的顶层入口。这个方法和独占式同步状态的释放方法(release(int arg))差不多,区别就在独占方式下要完全释放资源后(即state=0,因为独占下可重入,所以state的值可能会大于1)才会唤醒后面的线程,而releaseShared(int arg)在释放了资源后(可以释放部分资源)就可以唤醒后面的线程。

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
    }
      1. tryReleaseShared(int arg):调用重写的释放共享资源的方法尝试释放资源,释放成功唤醒下一个节点中并返回true,失败则返回false。
      2. doReleaseShared():唤醒下一个节点中的线程。
        private void doReleaseShared() {
                for (;;) {
                    Node h = head;
                    if (h != null && h != tail) {//同步队列不为空并且有阻塞的节点
                        int ws = h.waitStatus;
                        if (ws == Node.SIGNAL) {//如果都节点状态为SINGAL
                            //设置头节点状态为初始状态,成功则唤醒下一个节点
                            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                                continue;            // loop to recheck cases
                            unparkSuccessor(h);
                        }
                        else if (ws == 0 &&
                                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//如果头节点状态等于0,表示已经获取共享状态成功,通过CAS将状态设置为PROPAGATE,如果CAS操作失败,就一直循环
                            continue;                // loop on failed CAS
                    }
                    if (h == head)                   // loop if head changed
                        break;
                }
            }    
        doReleaseShared()

      到这里,独占式同步状态的获取和释放(acquire(int arg)、release(int arg))、共享式同步状态的获取和释放(acquireShared(int arg)、releaseShared(int arg))这几个方法的源码大致过了一遍,对AQS内部的实现算是有了一个基本的了解。其他几个响应中断的方法和前面看的几个类似,后面再看看具体的使用。

    参考资料:

      书籍:《Java并发编程的艺术》

      博客:http://www.cnblogs.com/waterystone/p/4920797.html

      博客:https://blog.csdn.net/u014674862/article/details/83021022

  • 相关阅读:
    docker-compose 使用
    mysql UNIX时间戳与日期的相互转换 查询表信息
    mysql查看表结构命令
    PostgreSQL新手入门
    ibdata1是?
    ubuntu 12.04 安装 nginx+php+mysql web服务器
    读懂IL代码就这么简单(二)
    读懂IL代码就这么简单(一)
    在Ubuntu Linux下怎样安装QQ
    jQuery 选择器
  • 原文地址:https://www.cnblogs.com/Mr-XiaoLiu/p/9985408.html
Copyright © 2011-2022 走看看