zoukankan      html  css  js  c++  java
  • 并发系列(4)之 AbstractQueuedSynchronizer 源码分析

    本文将主要讲述 AbstractQueuedSynchronizer 的内部结构和实现逻辑,在看本文之前最好先了解一下 CLH 队列锁,AbstractQueuedSynchronizer 就是根据 CLH 队列锁的变种实现的,因为本身 AQS 比较复杂不容易看清楚他本身的实现逻辑,所以查看 CLH 队列锁的实现,可以帮助我们理清楚他内部的关系;关于队列锁的内容可以参考 ,CLH、MCS 队列锁简介

    一、AQS 结构概述

    在 JDK 中除 synchronized 内置锁外,其他的锁和同步组件,基本可以分为:

    1. 面向用户的逻辑部分(对于锁而言就是 Lock interface);
    2. 面向底层的线程调度部分;

    AbstractQueuedSynchronizer 即同步队列则是 Doug Lea 大神为我们提供的底层线程调度的封装;AQS 本身是根据 CLH 队列锁实现的,这一点在注释中有详细的介绍,CLH、MCS 队列锁简介

    clh

    简单来讲,CLH 队列锁就是一个单项链表,想要获取锁的线程封装为节点添加到尾部,然后阻塞检查前任节点的状态 (一定要注意是前任节点,因为这样更容易实现取消、超时等功能,同时这也是选择 CLH 队列锁的原因),而头结点则是当前已经获得锁的线程,其主要作用是通知后继节点(也就是说在没有发生竞争的情况下,是不需要头结点的,这一点后面会详细分析);


    而对于 AQS 的结构大致可以表述为:

    clh
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
      protected AbstractQueuedSynchronizer() { }
      
      private transient volatile Node head;  // 懒加载,只有在发生竞争的时候才会初始化;
      private transient volatile Node tail;  // 同样懒加载;
      private volatile int state;  // 自定义的锁状态,可以用来表示锁的个数,以实现互斥锁和共享锁;
    }
    

    这里的可以直观的看到链表结构的变化,其实next链表只是相当于遍历的优化,而node节点的变化才是主要的更新;

    1. Node 结构

    static final class Node {
      static final Node SHARED = new Node();  // 共享模式
      static final Node EXCLUSIVE = null;     // 互斥模式
    
      static final int CANCELLED =  1; // 表示线程取消获取锁
      static final int SIGNAL    = -1; // 表示后继节点需要被唤醒
      static final int CONDITION = -2; // 表示线程位于条件队列
      static final int PROPAGATE = -3; // 共享模式下节点的最终状态,确保在doReleaseShared的时候将共享状态继续传播下去
    
      /**
       * 节点状态(初始为0,使用CAS原则更新)
       * 互斥模式:0,SIGNAL,CANCELLED
       * 共享模式:0,SIGNAL,CANCELLED,PROPAGATE
       * 条件队列:CONDITION
       */
      volatile int waitStatus;
      
      volatile Node prev;     // 前继节点
      volatile Node next;     // 后继节点
      volatile Thread thread; // 取锁线程
      Node nextWaiter;        // 模式标识,取值:SHARED、EXCLUSIVE
    
      // Used by addWaiter,用于添加同队队列
      Node(Thread thread, Node mode) {   
        this.nextWaiter = mode;
        this.thread = thread;
      }
    
      // Used by Condition,同于添加条件队列
      Node(Thread thread, int waitStatus) { 
        this.waitStatus = waitStatus;
        this.thread = thread;
      }
    }
    

    根据上面的代码和注释已经可以看到 AQS 为我们提供了两种模式,独占模式和共享模式(彼此独立可以同时使用);其中:

    • AbstractQueuedSynchronizer.state : 表示锁的资源状态,是我们上面所说的面向用户逻辑的部分;
    • Node.waitStatus : 表示节点在队列中的状态,是面向底层线程调度的部分;

    这两个变量一定要分清楚,在后面的代码中也很容易弄混;


    2. AQS 运行逻辑

    AQS 的运行逻辑可以简单表述为:

    AQS2

    如果你熟悉 synchronized ,应该已经发现他们的运行逻辑其实是差不多的,都用同步队列和条件队列,值得注意的是这里的条件队列和 Condition 一一对应,可能有多个;根据上图可以将 AQS 提供的功能总结为:

    • 同步状态的原子性管理;
    • 线程的阻塞与解除阻塞;
    • 队列的管理;

    3. 入队

    因为独占模式和共享模式彼此独立可以同时使用,所以在入队的时候需要首先指定 Node 的类型,同时入队的时候有竞争的可能,所以需要 CAS 入队;

    private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode); // SHARED、EXCLUSIVE
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
          pred.next = node;
          return node;
        }
      }
      enq(node);
      return node;
    }
    

    代码中注释也说明了,此处快速尝试入队,是一种优化手段,因为就一般情况而言大多数时候是没有竞争的;失败后在循环入队;

    private Node enq(final Node node) {
      for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
          if (compareAndSetHead(new Node())) // 此时head和tail才初始化
            tail = head;
        } else {
          node.prev = t;
          if (compareAndSetTail(t, node)) {
            t.next = node;
            return t;
          }
        }
      }
    }
    

    而对于出队则稍微复杂一点,独占模式下直接出队,因为没有竞争;共享模式下,则需要 CAS 设置头结点,因为可能对有多个节点同时出队,同时还需要向后传播状态,保证后面的线程可以及时获得锁;此外还可能发生中断或者异常出队,此时则需要考虑头尾的情况,保证不会影响队列的结构;具体内容将会在源码中一次讲解;


    二、独占模式

    1. 应用

    public class Mutex implements Lock {
      private final Sync sync = new Sync();
      private static final int lock = 1;
      private static final int unlock = 0;
    
      @Override
      public void lock() {
        sync.acquire(lock);
      }
    
      @Override
      public boolean tryLock() {
        return sync.tryAcquire(lock);
      }
    
      @Override
      public void unlock() {
        sync.release(unlock);
      }
    
      private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean isHeldExclusively() {
          return getState() == lock;
        }
    
        @Override
        public boolean tryAcquire(int acquires) {
          if (compareAndSetState(unlock, lock)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
          }
          return false;
        }
    
        @Override
        protected boolean tryRelease(int releases) {
          if (getState() == unlock)
            throw new IllegalMonitorStateException();
          setExclusiveOwnerThread(null);
          setState(unlock);
          return true;
        }
      }
    }
    

    注意代码中特意将 AbstractQueuedSynchronizer.state 取值定为lockunlock ,主要是便于理解 state 的含义,在互斥锁中可以任意取值,当然也可以是负数,但是一般情况下令其表示为锁的资源数量(也就是0、1)和共享模式对比,比较容易理解;

    2. 获取锁

    对于独占模式取锁而言有一共有四中方式,

    • tryAcquire: 快速尝试取锁,成功时返回true;这是独占模式必须要重写的方法,其他方式获取锁时,也会先尝试快速获取锁;同时 tryAcquire 也就决定了,这个锁时公平锁/非公平锁,可重入锁/不重冲入锁等;(比如上面的实例就是不可重入非公平锁,具体分析以后还会详细讲解)
    • acquire: 不响应中断,阻塞获取锁;
    • acquireInterruptibly: 响应中断,阻塞获取锁;
    • tryAcquireNanos: 响应中断,超时阻塞获取锁;

    acquire 方法

    流程图:

    acquire

    源码分析:

    public final void acquire(int arg) {
      if (!tryAcquire(arg) &&                             // 首先尝试快速获取锁
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 失败后入队,然后阻塞获取
        selfInterrupt();                                  // 最后如果取锁的有中断,则重新设置中断
    }
    
    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
        boolean interrupted = false;           // 只要取锁过程中有一次中断,返回时都要重新设置中断
        for (;;) {
          final Node p = node.predecessor();   // 一直阻塞到前继节点为头结点
          if (p == head && tryAcquire(arg)) {  // 获取同步状态
            setHead(node);                     // 设置头结点,此时头部不存在竞争,直接设置
            // next 主要起优化作用,并且在入队的时候next不是CAS设置
            // 也就是通过next不一定可以准确取到后继节点,所以在唤醒的时候不能依赖next,需要反向遍历
            p.next = null; // help GC          
            failed = false;
            return interrupted;
          }
          if (shouldParkAfterFailedAcquire(p, node) && // 判断并整理前继节点
            parkAndCheckInterrupt())                   // 当循环最多第二次的时候,必然阻塞
            interrupted = true;
        }
      } finally {
        if (failed)  // 异常时取消获取
          cancelAcquire(node);
      }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL) return true;
      if (ws > 0) {  // 大于0说明,前继节点异常或者取消获取,直接跳过;
        do {
          node.prev = pred = pred.prev;  // 跳过pred并建立连接
        } while (pred.waitStatus > 0);
        pred.next = node;
      } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  // 标记后继节点需要唤醒
      }
      return false;
    }
    

    其中 node.prev = pred = pred.prev; 相关的内存分析可以查看 JAVA 连等赋值问题


    acquireInterruptibly 方法

    流程图:

    acquireInterruptibly

    源码分析:

    public final void acquireInterruptibly(int arg) throws InterruptedException {
      if (Thread.interrupted()) throw new InterruptedException();  // 中断退出
      if (!tryAcquire(arg))           // 获取同步状态
        doAcquireInterruptibly(arg);  // 中断获取
    }
    
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
      final Node node = addWaiter(Node.EXCLUSIVE);   // 加入队尾
      boolean failed = true;
      try {
        for (;;) {
          final Node p = node.predecessor();
          if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return;
          }
          if (shouldParkAfterFailedAcquire(p, node) &&   // 判断并整理前继节点
            parkAndCheckInterrupt())                     // 等待
            throw new InterruptedException();
        }
      } finally {
        if (failed)
          cancelAcquire(node);
      }
    }
    

    tryAcquireNanos 方法

    流程图:

    tryAcquireNanos

    源码分析:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
      if (Thread.interrupted()) throw new InterruptedException();
      return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
      if (nanosTimeout <= 0L) return false;
      final long deadline = System.nanoTime() + nanosTimeout;
      final Node node = addWaiter(Node.EXCLUSIVE);
      boolean failed = true;
      try {
        for (;;) {
          final Node p = node.predecessor();
          if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return true;
          }
          nanosTimeout = deadline - System.nanoTime();
          if (nanosTimeout <= 0L) return false;          // 超时退出
          if (shouldParkAfterFailedAcquire(p, node) &&
            nanosTimeout > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
          if (Thread.interrupted())
            throw new InterruptedException();
        }
      } finally {
        if (failed)
          cancelAcquire(node);
      }
    }
    

    3. 释放锁

    释放锁时,判断有后继节点需要唤醒,则唤醒后继节点,然后退出;有唤醒的后继节点重新设置头结点,并标记状态

    public final boolean release(int arg) {
      if (tryRelease(arg)) {   // 由用户重写,尝试释放
        Node h = head;
        if (h != null && h.waitStatus != 0)
          unparkSuccessor(h);  // 唤醒后继节点
        return true;
      }
      return false;
    }	
    

    三、共享模式

    1. 应用

    public class ShareLock implements Lock {
      private Syn sync;
    
      public ShareLock(int count) { this.sync = new Syn(count); }
    
      @Override
      public void lock() { sync.acquireShared(1); }
    
      @Override
      public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
      }
    
      @Override
      public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; }
    
      @Override
      public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
      }
    
      @Override
      public void unlock() { sync.releaseShared(1); }
    
      @Override
      public Condition newCondition() { throw new UnsupportedOperationException(); }
    
      private static final class Syn extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 5854536238831876527L;
        Syn(int count) {
          if (count <= 0) {
            throw new IllegalArgumentException("count must large than zero.");
          }
          setState(count);
        }
    
        @Override
        public int tryAcquireShared(int reduceCount) {
          for (; ; ) {
            int current = getState();
            int newCount = current - reduceCount;
            //如果新的状态小于0 则返回值,则表示没有锁资源,直接返回
            if (newCount < 0 || compareAndSetState(current, newCount)) {
              return newCount;
            }
          }
        }
    
        @Override
        public boolean tryReleaseShared(int retrunCount) {
          for (; ; ) {
            int current = getState();
            int newCount = current + retrunCount;
            if (compareAndSetState(current, newCount)) {
              return true;
            }
          }
        }
      }
    }
    

    上述代码中的 AbstractQueuedSynchronizer.state 表示锁的资源数,但是仍然是不可重入的;


    2. 获取锁

    同样对于共享模式取锁也有四中方式:

    • tryAcquireShared: 快速尝试取锁,由用户重写
    • acquireShared: 不响应中断,阻塞获取锁;
    • acquireSharedInterruptibly: 响应中断,阻塞获取锁;
    • tryAcquireSharedNanos: 响应中断,超时阻塞获取锁;

    tryAcquireShared 方法

    @Override
    public int tryAcquireShared(int reduceCount) {
      for (; ; ) {
        int current = getState();
        int newCount = current - reduceCount;
        //如果新的状态小于0 则返回值,则表示没有锁资源,直接返回
        if (newCount < 0 || compareAndSetState(current, newCount)) {
          return newCount;
        }
      }
    }
    

    需要注意的是 tryAcquireShared 方法是快速尝试获取锁,并更新锁状态,如果失败则必然锁资源不足,返回负值;

    acquireShared 方法

    public final void acquireShared(int arg) {
      if (tryAcquireShared(arg) < 0)  // 快速获取失败
        doAcquireShared(arg);         // 阻塞获取锁
    }
    
    private void doAcquireShared(int arg) {
      final Node node = addWaiter(Node.SHARED);
      boolean failed = true;
      try {
        boolean interrupted = false;
        for (;;) {
          final Node p = node.predecessor();
          if (p == head) {
            int r = tryAcquireShared(arg);
            if (r >= 0) {
              setHeadAndPropagate(node, r);     // 设置头结点,并是情况将信号传播下去
              p.next = null; // help GC
              if (interrupted) selfInterrupt(); // 重新设置中断状态
              failed = false;
              return;
            }
          }
          if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
            interrupted = true;
        }
      } finally {
        if (failed)
          cancelAcquire(node);
      }
    }
    
    // propagate 表示线程获取锁后,共享锁剩余的锁资源
    private void setHeadAndPropagate(Node node, int propagate) {
      Node h = head; // Record old head for check below
      setHead(node);
      
      // propagate > 0 :表示还有剩余的资源
      // h.waitStatus < 0 : 表示后继节点需要被唤醒
      // 其余还做了很多保守判断,确保后面的节点能及时那到锁
      if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
          doReleaseShared();  // 唤醒后继节点
      }
    }
    

    根据上面的代码可以看到,共享模式和独占模式获取锁的主要区别:

    • 共享模式可以有多个锁
    • 设置头结点的时候,同时还要将状态传播下去

    其余的思路和独占模式差不多,他家可以自己看源码;

    3. 释放锁

    同样 tryReleaseShared 是由用户自己重写的,这里需要注意的是如果不能确保释放成功(因为共享模式释放锁的时候可能有竞争,所以可能失败),则在外层 Lock 接口使用的时候,就需要额外处理;

    @Override
    public boolean tryReleaseShared(int retrunCount) {
      for (; ; ) {
        int current = getState();
        int newCount = current + retrunCount;
        if (compareAndSetState(current, newCount)) {
          return true;
        }
      }
    }
    

    releaseShared 方法

    public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {  // 尝试取锁成功,此时锁资源已重新设置
        doReleaseShared();          // 唤醒后继节点
        return true;
      }
      return false;
    }
    

    doReleaseShared 方法必然执行两次,

    • 第一次头结点释放锁,然后唤醒后继节点
    • 第二次后继设置头结点

    最终使得头结点的状态必然是 PROPAGATE

    private void doReleaseShared() {
      for (;;) {
        Node h = head;
        if (h != null && h != tail) {
          int ws = h.waitStatus;
          if (ws == Node.SIGNAL) {
            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
              continue;      // loop to recheck cases
            unparkSuccessor(h);
          }
          else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
            continue;        // loop on failed CAS
        }
        if (h == head)       // loop if head changed
          break;
      }
    }
    

    四、条件队列

    1. ConditionObject 结构

    condition
    public class ConditionObject implements Condition, java.io.Serializable {
      private transient Node firstWaiter;
      private transient Node lastWaiter;
      ...
    }
    

    如代码所示条件队列是一个由 Node 组成的链表,注意这里的链表不同于同步队列,是通过 nextWaiter 连接的,在同步队列中 nextWaiter 用来表示独占和共享模式,所以区分条件队列的方法就有两个:

    • Node.waitStatus = Node.CONDITION;
    • Node.next = null & Node.prev= null;

    2. await

    public final void await() throws InterruptedException {
      if (Thread.interrupted()) throw new InterruptedException();
      Node node = addConditionWaiter();     // 添加节点到条件队列
      int savedState = fullyRelease(node);  // 确保释放锁,并唤醒后继节点
      int interruptMode = 0;
      while (!isOnSyncQueue(node)) {        // node 不在同步队列中
        LockSupport.park(this);             // 阻塞
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
          break;
      }
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
      if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
      if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
    

    3. signal

    public final void signal() {
      if (!isHeldExclusively()) throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      if (first != null)  
        doSignal(first);  // 从头结点一次唤醒
    }
    
    private void doSignal(Node first) {
      do {
        if ( (firstWaiter = first.nextWaiter) == null)
          lastWaiter = null;
        first.nextWaiter = null;
      } while (!transferForSignal(first) &&  // 将节点移动到同步节点中
           (first = firstWaiter) != null);
    }
    

    因为篇幅有点长了,所以条件队列讲的也就相对简单了一点,但是大体的思路还是讲了;

    总结

    • AbstractQueuedSynchronizer 通过私有变量继承方式使用
    • 观察 AbstractQueuedSynchronizer ,其实和 synchronized 的结构基本相同,但是 synchronized 还会自动根据使用情况进行锁升级
    • 此外本文的主要参考资料是《java 并发编程的艺术》,有兴趣的可以自行查看;
  • 相关阅读:
    图像处理基础2
    c++之morphologyEx(形态学操作)
    图像处理基础
    Mac 安装QT
    Qmake VS Cmake
    g++,qmake,cmake区别
    C++11中的匿名函数(lambda函数,lambda表达式)
    c++相关要点
    spritekit基础节点学习
    spriteKit简单学习
  • 原文地址:https://www.cnblogs.com/sanzao/p/10657020.html
Copyright © 2011-2022 走看看