zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer(AQS)源码解析

          关于AQS的源码解析,本来是没有打算特意写一篇文章来介绍的。不过在写本学期课程作业中,有一门写了关于AQS的,而且也画了一些相关的图,所以直接拿过来分享一下,如有错误欢迎指正。
          然后基本简介也都不介绍了,网上一大堆,这里就直接进行源码的分析了。

    AQS基本属性

          
          AQS属性简介:

    属性 类型 详解
    Head Node类型 持有锁的线程结点,也是队列中的头结点
    Tail Node类型 阻塞队列中的尾结点,同时每一个新的结点进来,都插入到阻塞队列的最后。
    State int类型 大于等于0。代表当前锁的状态。0代表没有线程占用当前锁,大于0代表有线程持有锁。
    exclusiveOwnerThread(继承自AOS) Thread类型 代表独占锁的线程。

          AQS的具体结构如下图所示:

          在AQS链表中,将每一个线程包装成Node实例,并通过链表的形式链接保存,在链式结构中,节点通过next和prev分别与前驱节点和后置节点相连接。其中head节点表示为当前持有锁的线程,不在阻塞队列中。tail节点为链表中最后一个节点,当有新的节点被添加到链表中后,AQS会将tail引用指向最后一个被添加进链表的节点。

    AQS中Node内部类

          
          Node属性简介:

    字段 简介 字段 简介
    SHARE 标识节点当前在共享模式下 EXCLUSIVE 标识节点当前在独占模式下
    CANCELLED 标识当前节点所表示的线程已取消抢锁 SIGNAL 标识当前节点需要在释放锁后唤醒后继节点
    CONDITION 与ConditionObject内部类有关 waitStatue 取值为以上几种状态
    prev 代表当前节点的前驱节点 next 代表当前节点的后继节点
    thread 代表当前节点所表示的线程

    1 加锁

          这里以一个锁的具体使用方法对AQS类进行详细的分析:

          首先,线程先对锁对象进行获取操作,如果当前需要获取的锁对象并没有其他线程所持有,成功获取到了锁,将执行相关的业务代码,执行完毕后,对锁资源进行释放,以便其他线程所使用。如果当前线程获取锁资源失败,说明锁资源有其他线程在使用,当前线程将进行阻塞状态,等待再次获取锁资源。

    1.1 AQS中如何获取锁

    java.util.concurrent.locks.ReentrantLock.java文件中的公平锁为例:

    abstract static class Sync extends AbstractQueuedSynchronizer
    #java.util.concurrent.locks.ReentrantLock中第220行
    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);  #调用了AQS中的方法
            }
    ...
    }
    ================AQS====================
    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第1197行
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

          在lock()方法中,线程首先尝试抢锁tryAcquire(1),如果抢锁成功则直接返回true,代表当前线程已持有锁资源,否则返回false,进行下一次抢锁动作。
          当线程抢锁失败后,AQS将将当前线程封装成Node节点,并添加到阻塞队列。之后将从阻塞队列中依次取出等待锁的Node节点,并再次尝试获取锁.如果再次获取锁失败,则使当前线程自己中断自己。

    1.2 尝试获取锁资源

          首先获取锁的状态,判断当前是否有线程持有锁,这里分为两种情况:

    • 如果当前并没有线程持有锁资源,则判断阻塞队列中是否有节点排在当前节点的前面等待获取锁资源。这里分为两种情况:

      • 如果有其他线程在等待获取锁资源,则进行等待
      • 如果没有其他线程在等待获取锁资源,表明当前线程是第一个等待获取锁的线程,随后尝试对锁资源进行获取,如果成功获取到锁资源则将当前线程设置为独占锁的线程,同时返回true.
    • 如果当前有线程持有锁,则进行判断是否是当前线程所持有锁资源,这是分为两种情况:

      • 锁资源被当前线程所持有,则表明是重入锁,随后将获取锁的次数加一,返回true.
      • 持有锁资源并不是当前线程,返回false.

    流程图如下:

    源码:

    #java.util.concurrent.locks.ReentrantLock中第231行
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
            }
            return false;
    }
    

    1.3 判断阻塞队列中是否有其他节点

          在线程获取锁之前,首先判断阻塞队列中是否有其他节点,如果有其他节点则放弃抢锁。
    首先获取AQS链表中的头节点与尾节点,分别进行判断:

    • 头节点是否等于尾节点
      • 如果头节点等于尾节点说明阻塞队列为空,没有其他节点返回false
    • 如果头节点不等于尾节点,则判断头节点的后置节点是否为空
      • 如果头节点的后置节点不为空,则说明阻塞队列不为空,则判断阻塞队列中第一个节点线程是否为当前线程
        • 如果是当前线程说明阻塞队列中没有其他节点返回false。
        • 如果不是当前线程说明阻塞队列中有其他节点,返回true.

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第1512行
    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    1.4 将当前线程添加到阻塞队列

          如果当前线程抢锁失败则通过AQS将当前线程包装成Node节点添加进阻塞队列。

    1. 将当前线程以独占锁的模式包装成Node节点。

    2. 将当前节点添加进阻塞队列。分两种情况:

      • 阻塞队列中尾节点不为空。
        • 将尾节点置为当前节点的前驱节点,通过CAS操作将当前节点置为尾节点。
          • 如果成功,则将之前尾结点的后置引用指向当前节点,将当前节点返回。
          • 如果存在另一节点提前完成上一步操作,则进行入队操作。
      • 阻塞队列中尾节点为空,则进行入队操作。
    3. 入队操作结束将当前节点返回。

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第605行
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    1.5 入队操作

          这一步将当前节点添加到阻塞队列中。
          首先获取阻塞队列中的尾节点,判断是否为空,有两种情况:

    • 阻塞队列中尾节点为空,则初始化阻塞队列,将头节点设置为尾节点,
      再次获取尾节点,判断是否为空。
    • 阻塞队列中尾节点不为空,则将尾节点设置为当前节点的前驱节点。
      通过CAS将当前节点设置为尾节点。这里有两种情况:
      • 如果成功,则将之前尾结点的后置引用指向当前节点,将当前节点的前驱节点返回。
      • 存在另一节点提前完成上一步操作,则再次获取阻塞队列中的尾节点,判断是否为空。

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第583行
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    1.6 抢锁或将线程挂起

          到达这一步说明节点已进入阻塞队列,节点尝试获取锁或者进行挂起操作。

    1. 获取当前节点的前驱节点
    2. 判断前驱节点是否为头节点,这里有两种情况:
      • 前驱节点为头节点,说明当前节点前面没有节点在等待获取锁资源,只需要等待前驱节点释放锁资源。所以可以尝试抢锁,这里有两种情况:
        • 抢锁成功,则将当前节点设置为头节点,将当前节点前驱节点的后置引用设置为空,返回false
        • 抢锁失败,说明头节点还没有释放锁资源,此时将当前节点挂起。这里有两种情况:
          • 如果挂起成功,则线程等待被唤醒,唤醒之后再次判断前驱节点是否为头节点。
          • 如果挂起失败,再次判断前驱节点是否为头节点。
      • 前驱节点不是头节点,说明当前节点前面有其他节点在等待获取锁资源,此时将当前节点挂起。
    3. 如果在挂起阶段发生异常,则取消抢锁。
    4. 这里为无限循环,直到线程获取到锁资源或者取消抢锁才会退出循环。

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第857行
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
                }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    1.7 判断是否应该挂起当前线程

          当线程暂时获取不到锁资源时,判断是否应该挂起当前线程。
          首先获取当前节点的前驱节点的状态,这里有三种情况:
    * 前驱节点的状态为SIGNAL。其中,SIGNAL表明该节点在释放锁资源后应该将后置节点唤醒。返回true。
    * 前驱节点的状态为CANCELLED。CANCELLED表明该节点已取消抢锁,此时将从当前节点开始向前寻找,直到找到一个节点的状态不为CANCELLED,然后将他设置为当前节点的前驱节点。之后返回false.
    * 如果前驱节点的状态不是以上两种情况,则通过CAS将前驱节点的状态设置为SIGNAL,之后返回false。

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第795行
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    1.8 挂起当前线程

          将当前线程挂起,当线程被唤醒后将线程的中断状态返回.
    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第835行
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    2 解锁

    2.1 解锁操作

          尝试释放锁资源,这里有两种情况:

    • 成功释放锁资源,获取到AQS链表中头节点,判断头节点是否为空,这里有两种情况:
      • 如果头节点为空,说明没有节点持有锁资源,返回true.
      • 如果头节点不为空,判断头节点状态是否为0:
        • 如果头节点状态为0,说明阻塞队列中没有线程在等待获取锁,返回true.
        • 如果头节点状态不为0,则将阻塞队列中第一个等待获取锁资源的线程唤醒。随后返回ture.

    流程图如下:

    源码:

    #java.util.concurrent.locks.ReentrantLock中第456行
    public void unlock() {
        sync.release(1);
    }
    ==============================
    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第1260行
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    

    2.2 唤醒后置节点

          当持有锁的节点执行相关代码完成后,需要释放锁资源并唤醒后置节点。

    1. 首先获取头节点的状态,如果小于0则通过CAS将状态设置为0.
    2. 获取头节点的后置节点,这里有两种情况:
      • 如果头节点的后置节点为空或者头节点的后置节点的状态大于0,则将头节点的后置节点置为空,同时从AQS链表的尾节点向前搜索,直到找到最后一个节点状态小于等于0的节点,将该节点唤醒。
      • 如果头节点的后置节点不为空,则直接将该节点唤醒。

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第638行
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    2.3 取消抢锁

          当线程由于异常或某些特殊情况的发生,需要取消对锁资源的获取,将执行取消抢锁操作。

    1. 如果需要取消抢锁的节点为空,则直接返回。
    2. 否则将节点所包装的线程置为空。
    3. 获取节点的前驱节点,判断前驱节点的状态是否大于0,如果大于0则一直向前找,直到找到一个节点的状态小于等于0,将该节点设置为当前节点的前驱节点。
    4. 获取当前节点的后置节点。
    5. 将当前节点的状态设置为CANCELLED。
    6. 判断当前节点是否为尾节点,这里有两种情况:
      • 如果当前节点是尾节点,则将当前节点的前驱节点设置为尾节点,
        同时将后置引用设置为空。
      • 如果当前节点不是尾节点,判断当前节点的前驱节点是否为头节
        点。这里有两种情况:
        • 如果当前节点的前驱节点是头节点,则将当前节点唤醒。
        • 如果当前节点的前驱节点不是头节点,则判断该节点状态是否为SIGNAL,如果为SIGNAL,则将该节点的后置引用指向当前节点的后置节点。
      • 断开当前节点与链表的连接。

    流程图如下:

    源码:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer中第742行
    private void cancelAcquire(Node node) {
        if (node == null)
            return;
        
        node.thread = null;
    
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        Node predNext = pred.next;
    
        node.waitStatus = Node.CANCELLED;
    
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
    
            node.next = node; // help GC
        }
    }
    

          其实到这里还有一些内容并没有分析完,以后再补上好了。

  • 相关阅读:
    【Git】Git 学习笔记(一)
    【工程 Shell】Shell 学习(一)
    Vue 使用 Antd 简单实现左侧菜单栏和面包屑功能
    GoF的23种设计模式的功能
    ASP 对数据库的操作
    注册表修改USB状态(开与关)
    EXE文件关联修复
    CentOS8安装Docker
    GoogleEarth无法连接服务器解决方法
    【转】Qt 实现的拷贝 文件/文件夹 的函数
  • 原文地址:https://www.cnblogs.com/cbkj-xd/p/11125890.html
Copyright © 2011-2022 走看看