zoukankan      html  css  js  c++  java
  • 并发编程学习笔记(十二、AQS同步器源码解析1,AQS独占锁)

    目录:

    • 学习目的
    • AbstractOwnableSynchronizer源码解析
    • AQS为什么如此重要
      • 重要性说明
      • AQS内部类Node源码解析
      • AQS实例属性源码解析
    • AQS独占锁加锁
    • AQS独占锁解锁
    • 总结
      • 知识点总结
      • 其它收获

    学习目的

    上一节学习到的ThreadPoolExecutor在并发编程中更为适用,故知己知彼方能百战不殆,这次的学习目的只要有如下几个:

    1、了解源码,能更加灵活使用线程池

    2、看看大神是如何设计一个线程池的

    • 如何合理的协调利用cpu 、内存、网络、i/o等系统资源
    • 利用线程池管理并复用线程控制最大并发数等。
    • 实现任务线程队列缓存策略拒绝机制
    • 实现某些与时间相关的功能,如定时执行、周期执行等。
    • 隔离线程环境。比如,交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要大;因此,通过配置独立的线程池,将较慢的交易服务与搜索服务隔开,避免个服务线程互相影响。

    AbstractOwnableSynchronizer源码解析

    话说了解ThreadPoolExecutor的话,为啥又要说道AbstractOwnableSynchronizer呢。

    因为线程池中的线程有点特殊,它们都是AQS的子类(AbstractQueuedSynchronizer),而AQS的父类又是AbstractOwnableSynchronizer

    我们可以从ThreadPoolExecutor中的Worker中看出。

    所以要了解ThreadPoolExecutor的工作原理,首先就需要知道他们的父类AbstractOwnableSynchronizer

    ——————————————————————————————————————————————————————————————————————

    源码非常简单,你可以轻松的看懂:

     1 public abstract class AbstractOwnableSynchronizer
     2     implements java.io.Serializable {
     3 
     4     /** Use serial ID even though all fields transient. */
     5     private static final long serialVersionUID = 3737899427754241961L;
     6 
     7     /**
     8      * Empty constructor for use by subclasses.
     9      */
    10     protected AbstractOwnableSynchronizer() { }
    11 
    12     /**
    13      * 独占模式下,同步器的当前拥有者(一个线程对象)
    14      */
    15     private transient Thread exclusiveOwnerThread;
    16 
    17     /**
    18      * exclusiveOwnerThread setter方法
    19      */
    20     protected final void setExclusiveOwnerThread(Thread thread) {
    21         exclusiveOwnerThread = thread;
    22     }
    23 
    24     /**
    25      * exclusiveOwnerThread getter方法
    26      */
    27     protected final Thread getExclusiveOwnerThread() {
    28         return exclusiveOwnerThread;
    29     }
    30 }

    AQS为何如此重要

    我们可以借助IDEA看出其结构:

    AQS有很多子类,如下

    • java.util.concurrent.ThreadPoolExecutor.Worker:线程池。
    • java.util.concurrent.CountDownLatch.Sync:计数器。
    • java.util.concurrent.locks.ReentrantLock.Sync:重入锁。
    • java.util.concurrent.locks.ReentrantReadWriteLock.Sync:重入读写锁。
    • java.util.concurrent.Semaphore.Sync:信号量。

    而且ReentrantLock、ReentrantReadWriteLock、Semaphore还有自己的公平锁,非公平锁(FairSync,NonFairSync)。

    综上,如此多重要的类都是基于AQS实现,你说重要不。

    ——————————————————————————————————————————————————————————————————————

    AQS内部类Node源码解析:

     1 /**
     2  * 等待队列的节点类
     3  */
     4 static final class Node {
     5     /** 标识节点当前在共享模式下 */
     6     static final Node SHARED = new Node();
     7     /** 标识节点当前在独占模式下 */
     8     static final Node EXCLUSIVE = null;
     9 
    10     /** 下面的常量是waitStatus的枚举值 */
    11     /** 标识此线程已取消 */
    12     static final int CANCELLED =  1;
    13     /** 标识当前node后继节点所对应的节点线程“需要被唤醒” */
    14     static final int SIGNAL    = -1;
    15     /** 线程在等待condition条件 */
    16     static final int CONDITION = -2;
    17     /** 共享模式下node可能处于此状态,表示锁的下一次获取可以“无条件传播” */
    18     static final int PROPAGATE = -3;
    19 
    20     /**
    21      * 线程等待状态
    22      * 范围只可能是上面四种,CANCELLED、SIGNAL、CONDITION、PROPAGATE
    23      * 以及0,0是正常的同步节点,此字段初始值也就是0
    24      */
    25     volatile int waitStatus;
    26 
    27     /**
    28      * 前驱节点,用于检查waitStatus
    29      * 若当前节点取消,就需要前驱结点和后继节点来完成连接
    30      */
    31     volatile Node prev;
    32 
    33     /**
    34      * 后继节点,指向当前节点在释放时唤醒的后继节点
    35      */
    36     volatile Node next;
    37 
    38     /**
    39      * 入队是的当前线程
    40      */
    41     volatile Thread thread;
    42 
    43     /**
    44      * 存储condition队列中的后继节点
    45      */
    46     Node nextWaiter;
    47 
    48     /**
    49      * 若是共享模式下等待,则返回true
    50      */
    51     final boolean isShared() {
    52         return nextWaiter == SHARED;
    53     }
    54 
    55     /**
    56      * 返回当前节点的前驱结点
    57      */
    58     final Node predecessor() throws NullPointerException {
    59         Node p = prev;
    60         if (p == null)
    61             throw new NullPointerException();
    62         else
    63             return p;
    64     }
    65 
    66     Node() {    // Used to establish initial head or SHARED marker
    67     }
    68 
    69     Node(Thread thread, Node mode) {     // Used by addWaiter
    70         this.nextWaiter = mode;
    71         this.thread = thread;
    72     }
    73 
    74     Node(Thread thread, int waitStatus) { // Used by Condition
    75         this.waitStatus = waitStatus;
    76         this.thread = thread;
    77     }
    78 }

    此Node便是ThreadPoolExecutor入参的workQueue,通过一个双向链表实现的一个等待队列;定义了共享、独占模式,以及五种等待状态。

    ——————————————————————————————————————————————————————————————————————

    AQS实例属性源码解析:

     1 /**
     2  * 等待队列头结点
     3  */
     4 private transient volatile Node head;
     5 
     6 /**
     7  * 等待队列尾结点
     8  */
     9 private transient volatile Node tail;
    10 
    11 /**
    12  * 同步状态
    13  */
    14 private volatile int state;
    15 
    16 /**
    17  * state getter方法
    18  */
    19 protected final int getState() {
    20     return state;
    21 }
    22 
    23 /**
    24  * state setter方法
    25  */
    26 protected final void setState(int newState) {
    27     state = newState;
    28 }

    AQS独占锁加锁

    在了解了AQS的Node以及基本的属性后就可以开始学习独占锁的加锁过程了。

    首先既然是锁的话,那肯定就分为加锁解锁两个功能:

    • 加锁:获取锁,当多个线程同时获取锁的时候,那么只能有一个线程能够获取成功,其它线程都需要在当前位置阻塞等待。
    • 解锁:释放锁,获取锁的线程释放锁,而且还必须能唤醒一个正在等待锁资源的线程。

    ——————————————————————————————————————————————————————————————————————

    那么这里先看下AQS独占锁的加锁逻辑。

    1 public final void acquire(int arg) {
    2     if (!tryAcquire(arg) &&
    3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    4         selfInterrupt();
    5 }

    方法很简单,只有几行代码,我们来一步步的分析。

    首先从方法及其示意上来看,AQS独占锁的加锁逻辑大致流程如下:

    • tryAcquire():尝试获取锁;如果获取成功则结束,获取失败则继续执行。
    • addWaiter():未获取锁的线程进入等待队列;将等待线程加入队列尾部,并标记为独占模式。
    • acquireQueued():线程阻塞,持续获取锁;让线程在队列中获取资源,直到获取到资源才返回;如果等待过程中被中断,则返回true,否则返回false。
    • selfInterrupt():acquireQueued中说道,线程被中断才会返回true,也就是说如果未获取到锁,且线程被中断,就会执行此函数,设置中断标识。
      • 但我们要清楚一点,线程中断其实也就是设置一个中断标识而已,至于真的是否中断,还是要看CPU。
      • 而AQS这里也只是把中断标识塞回去了而已,所以步骤3在等待过程中被中断,它是不响应的,只是获取资源后才进行自我中断,设置中断标识。

    ——————————————————————————————————————————————————————————————————————

    上面将大致流程梳理了下,现在来说说具体步骤是如何实现的。

    1、tryAcquire():尝试获取锁。

    1 protected boolean tryAcquire(int arg) {
    2     throw new UnsupportedOperationException();
    3 }

    它的实现是抛出异常,所以我们可以推断出,其真正的实现是委托给子类的,为了开发者误用所以才抛出异常。

    那为啥需要子类实现,而又不定义为abstract方法呢,这也是Doug Lea大佬站在开发者角度考虑的。

    • 若定义为abstract的话,独占锁需要重写tryAcquire、tryRelease
    • 共享锁需要重写tryAcquireShared、tryReleaseShared
    • 这样可以减少不必要的开发(嗯~~~学习学习)。

    参数arg,则是一个预定义,你可以自行维护,干啥都可以;比较常用的就是定义为状态。

    2、addWaiter():将未获取到锁的线程加入队尾。

     1 private Node addWaiter(Node mode) {
     2     // 创建一个当前线程的节点;thread = currentThread,nextWaiter = Node.EXCLUSIVE(null)
     3     Node node = new Node(Thread.currentThread(), mode);
     4     // 先尝试快速加入队列,若失败则采用自旋的方式加入节点
     5     Node pred = tail;
     6     if (pred != null) {
     7         node.prev = pred;
     8         if (compareAndSetTail(pred, node)) {
     9             pred.next = node;
    10             return node;
    11         }
    12     }
    13     // 若队尾为null,或CAS进入队尾失败(存在竞争),则通过enq方法自旋
    14     enq(node);
    15     return node;
    16 }

    关于上面这段代码你需要了解如下几个点:

    • 自旋:是一种没有获取到锁的线程,它会一直循环等待,并判断该资源是否已经释放锁。
    • CAS:CAS的全称是Compare-And-Swap,比较并交换,是一条CPU并发原语。它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。

    自旋:

     1 private Node enq(final Node node) {
     2     // 死循环,直到node成功进入队尾(因为存在竞争,所以并不会一次成功,故才死循环)
     3     for (;;) {
     4         Node t = tail;
     5         // t的指针已经指向尾结点,t == null说明队列是空的
     6         if (t == null) { // Must initialize
     7             // 空队列,则创建一个新的节点,并将尾结点指向头结点
     8             if (compareAndSetHead(new Node()))
     9                 tail = head;
    10         } else {
    11             // t != null,队列非空;则将node的前驱结点指向为t,t的后继节点指向为自己
    12             // 也就是将自己放入队尾,并改变自己的前驱与原来队尾的后继节点
    13             node.prev = t;        
    14             if (compareAndSetTail(t, node)) {
    15                 t.next = node;
    16                 return t;
    17             }
    18         }
    19     }
    20 }

    3、acquireQueued():线程阻塞,持续获取锁。

     1 /**
     2  * 节点加入队列后,尝试在等待队列中自旋的获取资源
     3  */
     4 final boolean acquireQueued(final Node node, int arg) {
     5     // 标记表示是否成功拿到资源
     6     boolean failed = true;
     7     try {
     8         // 标记是否被中断
     9         boolean interrupted = false;
    10         for (;;) {
    11             // 获取前驱节点,前驱节点为null则抛出异常
    12             final Node p = node.predecessor();
    13             // 若node的驱节点是头结点,且获取锁成功则自旋结束(也就是等head节点释放资源后,node节点最为头节点的后继节点就要去竞争资源)
    14             // p == head:仅执行队列的队头线程,保证自旋效率,不做耗时的等待操作(如尝试获取锁)
    15             if (p == head && tryAcquire(arg)) {
    16                 setHead(node);
    17                 p.next = null; // help GC
    18                 failed = false;
    19                 return interrupted;
    20             }
    21             // 若node前驱节点不是头节点head,或node节点尝试获取资源失败,则:
    22             // 1、检查并更新无法获取资源的节点状态,若当前线程阻塞则返回true
    23             // 2、阻塞线程,并检查中断状态
    24             if (shouldParkAfterFailedAcquire(p, node) &&
    25                 parkAndCheckInterrupt())
    26                 interrupted = true;
    27         }
    28     } finally {
    29         // 如果node前驱节点为null时,则抛出空指针,此时便会进入此分支
    30         if (failed)
    31             // 取消对资源的获取
    32             cancelAcquire(node);
    33     }
    34 }

    4、selfInterrupt():中断线程,但不对中断做出响应(内部acquireQueued()函数维护中断标识,而不会对外部的中断做出响应)。

    1 static void selfInterrupt() {
    2     Thread.currentThread().interrupt();
    3 }

    AQS独占锁解锁

    解锁和加锁的方式比较像,我这里就简单介绍下。

    直接上源码:

     1 public final boolean release(int arg) {
     2     // 首先尝试解锁,和加锁的方式很像,交给子类实现
     3     if (tryRelease(arg)) {
     4         Node h = head;
     5         // h != null,说明队列中存在线程;有线程才能去解锁咯,没有直接返回false,解锁失败了
     6         // h.waitStatus != 0,说明线程不是初始化状态;若不是初始化状态,说明啥都没操作,解锁干啥,哈哈
     7         if (h != null && h.waitStatus != 0)
     8             // 满足解锁条件后,去解锁,并让其头结点的后继节点竞争资源
     9             unparkSuccessor(h);
    10         return true;
    11     }
    12     return false;
    13 }

    总结

    到此,AQS独占锁的加解锁大致流程都介绍完了,我们来总结下。

    知识点总结

    • 加锁:java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire。
      • 获取锁的具体逻辑是子类实现。
      • 竞争资源时采用自旋方式,在等待中会一直尝试获取锁,直到拿到后结束。
      • 自旋原因是多个线程会同时竞争资源,所以会有无法完成的情况(如,addWaiter的添加到队尾,acquireQueued的获取资源)。
    • 解锁:java.util.concurrent.locks.AbstractQueuedSynchronizer#release
      • 与加锁逻辑类似,释放锁的具体逻辑也是子类实现。
      • 释放当前资源时会通知其后继节点来竞争资源。

    ——————————————————————————————————————————————————————————————————————

    其它收获

    • 需要子类实现的方法并不一定要定义成abstract,可以是空实现,若怕使用者无法正确使用,可抛出异常
    • 当某些东西同时竞争资源时,可让当前对象原子操作,若未竞争成功则再次竞争(while(true)或for(;;));次点来自于自旋。
    • 链表挺好用,可以串连逻辑;比如产品前端流程就可以使用,把前端页面的流转抽象成一个链表,用链表串流产品流程,若有增减页面直接维护链表就可以了。
  • 相关阅读:
    React+AntdUi实现《好客租房系统》首页01
    javaScript学习day04——关于函数
    javaScript学习day03
    javascript学习day01
    第五章回溯法
    第四章作业
    第四章上机实践报告
    算法第三章动态规划
    PTA 7-3 编辑距离问题 (30 分)
    7-3 两个有序序列的中位数 (20 分) log n的解法
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/13122011.html
Copyright © 2011-2022 走看看