zoukankan      html  css  js  c++  java
  • AQS源码分析

    AQS全程为AbstractQueuedSynchronizer,其定义了一套多线程访问共享资源的同步框架,大部分的同步类的实现都依赖于他,比如ReentrantLock,ReentrantReadWriteLock, Semaphore, CountDownLatch等等;

             AQS的内容包括了以下几个方面:

    1)         AQS实现线程阻塞和唤醒的基础:LockSupport;

    2)         AQS子类(自定义同步器);

    3)         队列;

    4)         独占模式资源请求;

    5)         共享模式资源请求;

    6)         其他细节;

    一、LockSupport:

    封装了unsafe的park/unpark接口,提供了阻塞线程和唤醒线程的功能,park/unpark的声明原型为:

      public native void unpark(Thread jthread);  

      publicnativevoid park(boolean isAbsolute, long time);  

    isAbsolute参数表示是否是绝对时间,time参数表示时间值,在LockSupport中的实现代码如下:

     

    二、自定义同步器:

    AQS提供了两种资源共享方式:独占模式和共享模式;

    AQS提供了同步器的框架代码,需要派生子类来实现自定义同步器,自定义同步器在实现时只需要重载资源请求的接口函数即可,在这些接口函数中实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了;

    自定义同步器实现时主要实现以下几种方法:

    序号

    接口函数

    说明

    1

    isHeldExclusively()

    线程是否正在独占资源。只有用到condition才需要去实现它。

    2

    tryAcquire(int)

    独占方式。尝试获取资源,成功则返回true,失败则返回false。

    3

    tryRelease(int)

    独占方式。尝试释放资源,成功则返回true,失败则返回false。

    4

    tryAcquireShared(int)

    共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

    5

    tryReleaseShared(int)

    共享方式。尝试释放资源,成功则返回true,失败则返回false。

    一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

    三、队列:

    当一个线程调用tryAcquire请求资源时,如果成功则直接返回,如果其他线程已经占用着并且还未释放,则tryAcquire会返回失败,然后将这个线程和模式(共享模式还是独占模式)添加到队列尾部,并且调用unsafe.park对这个线程进行阻塞,直到当前执行的线程执行完成后调用unsafe.unpark唤醒下一个阻塞的线程,也就是返回下一个阻塞线程自旋代码中的unsafe.park调用处,然后下一个阻塞线程得以继续执行并请求tryAcquire(这时一般返回成功);

    队列的结构如下:

    该队列被定义成一个FIFO队列,每一个节点被定义为Node,Node的结构定义如下:

    static final class Node {

             volatile Thread thread;

             volatile Node prev;

             volatile Node next;

             volatile int waitStatus;

             Node nextWaiter; 

    }

    节点属性的说明信息:

     

    从同步队列节点Node的WaitStatus状态可以看出,当waitStatus > 0 时表示节点所在线程被cancel掉了,如果waitStatus < 0 表示前一个节点的线程执行完成后需要通知唤醒下一个节点的线程,如果waitStatus = 0 表示初始状态;

    同步器AQS的主要使用方式是继承,子类通过继承同步器并实现他的抽象方法来管理同步状态,实现过程中免不了要对同步状态(上图中的资源State)进行修改,AQS对同步状态State提供了三个接口函数:getState, setState(int newState), compareAndSetState(int except, int update);

    四、独占模式资源请求:

    (一) 请求资源:

     

    此方法是独占模式下线程获取共享资源的顶层入口。

    流程如下:

    1)   tryAcquire()尝试直接去获取资源,如果成功则直接返回;

    2)   addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

    3)   acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

    4)   如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

    1. 1.  tryAcquire:

    本接口在AQS中没有实现,留到自定义同步器中去实现;

    本接口没有定义成abstract的原因是:因为独占模式下只用实现tryAcquire/tryRelease,而共享模式下只用实现tryAcquireShared/tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口;

    1. 2.  addWaiter:

    用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

     

    先将当前线程和独占模式一起构造一个Node节点,如果尾节点tail不为空,则将Node添加到tail的后面,如果tail节点为空,说明链表是空的,则将head和tail节点都设置为节点Node;

    1. 3.  acquireQueued:

     

    首先这个函数是一个自旋过程,在不停的自旋中当前线程去请求获取资源(tryAcruire),如果请求成功,则将node设置为head节点并返回,如果请求失败,则判断node的前一个有效节点是不是head,并且head.waitStatus是不是SIGNAL,如果是则阻塞当前线程,直到head节点所在线程执行完成触发unsafe.unpark或者node所在的线程被中断时才返回继续执行,继续执行时再次的自旋,重复上面的过程,直到资源请求(tryAcquire)成功;

     

    shouldParkAfterFailedAcquire函数判断node节点的前一个节点的waitStatus如果是SIGNAL(SIGNAL状态表示所在线程执行完成后自动通知下一个节点)则直接返回,如果不是则设置成SIGNAL,如果node的前一个节点的线程已经cancel了(或者再往前还有好多节点都是已经cancel的),则把这些已经cancel的无引用链节点全部去掉(去掉后就可以由java的自动内存回收机制进行自动回收);

     

    parkAndCheckInterrupt会调用LockSupport.park阻塞Node节点所在的线程,直到head节点所在的线程执行完成调用unsafe.unpark或者中断了Node节点所在的线程时才返回继续执行;

    注意:返回Thread.interrupted后同时清除了线程的interrupted标记;

    1. 4.  selfInterrupt:

     

    中断当前线程(如果在parkAndCheckInterrupt中返回了线程被中断,则在自旋中不做中断处理,而是在这里中断当前线程);

    总结下来,请求资源函数(Acquire())的处理流程图为:

     

    (二) 释放资源:

     

    此方法是独占模式下线程释放共享资源的顶层入口。

    他的功能是释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

    流程:

    1、先调用tryRelease释放资源(release函数根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!);

    2、如果释放资源成功,则调用unparkSuccessor来唤醒等待队列里的下一个线程;

    1、 tryRelease:

    在AQS中无实现,仅仅抛出一个异常,留待自定义同步器自己去实现;

    2、 unparkSuccessor:

     

    一句话概括:unpark()唤醒等待队列中最前边的那个未放弃线程;

    首先如果当前线程所在节点的waitStatus不为0,则设置为0;然后找到当前线程所在节点(也就是head节点)的下一个有效的节点,调用LockSupport.unpark唤醒这个有效的节点的线程;

    五、共享模式资源请求:

    (一)  请求资源:

    共享模式下线程获取共享资源的顶层入口函数是acquireShared;这个函数获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。

     

    1)   tryAcquireShared()尝试获取资源,成功则直接返回(AQS中的tryAcquireShared依然只是抛出一个异常,留待自定义同步器去实现);

    AQS中定义好了tryAcquireShared返回值的意义:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取

    2)   失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

    1doAcquireShared

     

    与独占模式下的请求资源函数doAcquire基本相同,所不同的是只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会调用setHeadAndPropagate函数去唤醒之后的队友(独占模式下因为只允许一个线程,因此唤醒当前线程的下一个有效线程即可,而共享模式下允许多个线程,只要资源还有剩余就要去继续唤醒剩下的线程)。

    首先构造一个SHARED节点node添加到队列尾端,如果他的前置节点是head的话就尝试去获取资源(tryAcquireShared()),如果成功则将node设置为head,然后如果刚刚获取的资源还有多余的话就调用setHeadAndPropagate函数去继续唤醒其他线程;如果获取失败则阻塞node节点所在线程,直到当前线程结束后调用了unsafe.unpark或者终止了node节点所在的线程时才返回继续尝试自旋获取资源;

    2setHeadAndPropagate

     

    首先将node设置为head节点,然后判断共享资源如果还有多余,并且head节点的下一个节点是共享模式的,则调用doReleaseShared来继续唤醒其他节点;

    3、 doReleaseShared:

    如果head节点的waitStatus是SIGNAL,则设置为0,并且继续唤醒后续节点,否则如果waitStatus为0时(表示head节点所在线程还未执行完成),设置head节点的waitStatus为Propagate(Propagate表示共享模式下可以继续唤醒后续节点)

    unparkSuccessor函数调用LockSupport.unpark唤醒其他节点的线程;

    (二) 释放资源:

    releaseShared是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

     

    释放掉资源后,唤醒后继。

    独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的releaseShared()则没有这种要求,一是共享的实质--多线程可并发执行;二是共享模式基本也不会重入吧(至少我还没见过),所以自定义同步器可以根据需要决定返回值。

     

    doReleaseShared唤醒后续的节点线程;

    六、其他细节:

    hasQueuedThreads

    查询队列里是否有线程在等待,由于中断和超时引起的取消操作随时都可能发生,这方法不一定实时准确。

     

    hasContended

    查询队列是否被多个acquire请求竞争过(导致某个线程阻塞过)。为什么head 不为null就能证明?有竞争就会入队列此时head不为null,但是任务执行完了呢?通过上面的代码知道,head是由队列里刚获得到锁的线程设置的 (把自己设置成head),即使任务执行完也不会修改head,只能由下个入队的线程设置,这样head就永远不会为空了。

     

    getFirstQueuedThread

    返回队列里第一个没有获取到锁的线程,如果head等于tail说明队列里没有线程在等待,直接返回null;否则,调用fullGetFirstQueuedThread。

     

    isQueued

    判断线程是否在队列里(包含头节点了),跟getFirstQueuedThread不一样,没有先从head开始找,直接从tail开始反向搜索。

     

    apparentlyFirstQueuedIsExclusive

    队列里第一个等待的结点是否是独占模式。

     

    isFirst

    当前线程是否是队列里的第一个元素。

     

    getQueueLength

    队列长度,不包括已经取消的和头节点,因为它俩的thread域都为null。

     

  • 相关阅读:
    学习九-python 异常处理
    验证基于逻辑回归的隐马尔可夫模型的心音信号切分算法(literature study)
    字典的内置方法比较
    学习六
    Ubuntu 16.04 LTS 搜狗输入法安装
    集合经验模态分解(EEMD)在语音中的应用举例
    SwfUpload及imgareaselect使用方法
    Uploadify插件使用方法
    Ueditor使用方法
    PartialView 加载Js
  • 原文地址:https://www.cnblogs.com/laoxia/p/8046731.html
Copyright © 2011-2022 走看看