zoukankan      html  css  js  c++  java
  • AQS简介

    1 基础

    AQS的类图结构如下所示:
    image

    AQS实现共享资源的访问控制基础:

    1. state字段,即同步器状态字段。用于共享资源的访问控制
    2. CLH队列,FIFO等待队列,存放竞争失败的线程。通常CLH队列是一个自旋队列,AQS以阻塞的方式实现

    CLH队列的使用:

    1.1 常用字段:

    // CLH队列中的头尾节点
    private transient volatile Node head;
    private transient volatile Node tail;
    // 同步状态
    private volatile int state;
    

    注意:多线程同步获取资源成功,则state字段会自增;若有线程释放资源,则state字段自减。

    1.2 CLH队列

    CLH队列有AQS的内部类Node节点构成,节点内容如下:

    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        
        //节点watiStatus的值
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
    }
    

    因为其waitStatus的值是有序的,CANCELLED状态下值为正数,因此很多判断可以不使用等值比较。
    数据结构中waitStatus为节点的等待状态。节点有4种状态(值也可以为0):

    • CANCELLED :终态,该节点被取消由于超时或中断
    • SIGNAL:该节点的后继节点是blocked(via park),所以当前节点release或cancels时,必须unpark它的后继节点
    • CONDITION:该节点处于条件队列中,将不会被用于sync queue,直到节点状态被设置为0
    • PROPAGATE:releaseShared应该被传播到其他节点

    1.2 入队

    addWaiter()方法的作用将一个Node节点放入到CLH队列的队尾。代码如下:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速入队,失败则使用enq()方式
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    注意上述代码,共分为3个步骤:

    • 第一步:首先将oldTail赋值给newNode.prev:node.prev = pred
    • 第二步:将tail赋值给newNode:compareAndSetTail(pred, node)
    • 第三步:将oldTail的next指针指向newNode(即tail):pred.next = node

    这3个步骤之间会存在时间差。因此可能存在这种情况:

    nodeA添加到CLH队列并执行完步骤2,尚未执行步骤3时,刚好有其他线程遍历CLH队列,此时若从CLH队列head向tail节点方向遍历,就会漏掉节点。

    为解决上述情况,假设我们称:从CLH的head向tail方向称为正向遍历;从tail向head方向称为逆向遍历。则:

    先正向遍历,一旦遍历的结果为空,则从tail节点逆向遍历,直到遍历到和正向遍历相同的节点,视为遍历结束。

    上述代码中如果快速入队失败,就会进行自旋入队方式的enq()方法,基本和addWaiter()方法一致:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    

    1.3 hasQueuedPredecessors()

    该方法用于查询CLH队列中是否有节点比当前线程等待的更久。

    • 因为由于中断导致的取消或超时随时可能发生,因此不能保证CLH队列中的那些比当前线程等待更久的线程能获取到资源。
    • 同样的也可能存在这种情况,由于队列为空,导致方法返回false
    public final boolean hasQueuedPredecessors() {
        Node t = tail;    // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    1.4 独占锁和共享锁

    AQS提供了2种获取资源的模式,独占和共享。任何实现了AQS的实现类都只能实现2种模式中的一种,而不能同时实现。

    独占模式

    AQS的独占模式,提供了如下对外方法:

    public final void acquire(int arg)  
    public final void acquireInterruptibly(int arg)  
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)  
    public final boolean release(int arg)
    

    AQS的实现类,需要实现如下方法:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    

    共享模式

    AQS的共享模式,提供了如下对外方法:

    public final void acquireShared(int arg)  
    public final void acquireSharedInterruptibly(int arg)  
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)  
    public final boolean releaseShared(int arg)
    

    AQS的实现类,需要实现如下方法:

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

    2 获取独占资源

    使用AQS获取独占资源时,使用acquire()方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            selfInterrupt();
        }
    }
    

    2.1 acquireQueued()

    上述方法可以知道,获取资源的核心实现在tryAcquire()方法中,即AQS的实现类中。在获取资源失败的情况下,会调用acquireQueued()方法进行入队操作(入队前会进行一次尝试获取资源)。如下代码:

    /* 若node节点的前继节点是head节点,则会再次调用tryAcquire()获取资源。 */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断当前节点是否可以进入park,若可以,让线程进入等待
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果获取资源失败,则取消
            if (failed)
                cancelAcquire(node);
        }
    }
    

    上述代码中,一共有3个注意点:

    • 判断当前节点的前继节点是否为head节点。若是,则表示该节点有资格尝试获取共享资源。此处的head节点的判断在一定程度上保证资源竞争的公平性
    • shouldParkAfterFailedAcquire():判断当前节点是否可以安全进入park()
    • parkAndCheckInterrupt():让线程进入等待
    /** 该方法的作用在于判断当前节点中的线程,是否可以安全的进入park()。返回true,表示进程可以进入park。若前驱节点的waitStatus为SIGNAL,则表示当前节点可以安全的park()。 */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 如果前驱节点的waitStatus为SIGNAL,则表示当前节点可以安全的park()
        if (ws == Node.SIGNAL) { return true; }   
        // waitStatus>0,即为CANCELLED状态,此时当前节点需要找到状态不为CANCELLED状态的节点,将其设置为自己的前驱节点,并将新的前驱节点的next指向自己。
        // 注意,这样做完之后,那些当前节点的waitStatus状态为CANCELLED的前驱节点链,将成为孤链。但这个孤链仍然有指向原等待队列的prev和next指针。只是原等待队列中已经没有指向孤链的节点指针
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 走到此处,表明前驱节点的状态为0或PROPAGATE。此时可以将前驱节点的waitStatus设置为SIGNAL状态
            // 注意:这里仍然要返回false,表明当前节点不能被park。我们需要在park之前,重试确认该节点不能获取到资源
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  // 代码A。  
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    2.2 图解流程

    分析代码情况:按照前面enq()方法的分析,假若有t1,t2两个线程竞争资源,最后t1获取资源;t2进入到CLH队列,然后t2开始调用acquireQueued()方法。

    1. 进入循环前如下图
      image
    2. 第一次循环:当前节点的前继节点为head,tryAcquire()获取资源,t1线程获取资源,获取失败,调用shouldParkAfterFailedAcquire()之后情况如下图
      image
    3. 第二次循环:当前节点的前继节点为head,tryAcquire()获取资源,t1线程占用资源,获取失败,调用shouldParkAfterFailedAcquire()直接返回成功,当前节点进入WAIT状态。情况如下图
      image
    4. 假设此时t3线程前来竞争资源(t1还占着资源呢)。此时又进入到addWaiter()方法,执行之后,如下图
      image
    5. 然后接着调用acquireQueued()方法,执行完毕后,将t3线程进入WAIT状态,如下图
      image

    2.3 取消节点

    节点取消需要做一系列操作:

    1. 当前节点的前继节点不能是CANCELLED状态。因此,我们需要从当前节点逆向遍历CLH找到第一个不为CANCELLED的节点pred:正常的节点
    2. 将当前节点状态修改为CANCELLED
    3. 然后就是将pred作为正常节点,当前节点及其前继节点为CANCELLED状态的节点链,记为cancelledNodes,剔除CLH队列。该操作,需要针对特殊节点判断:
      1. 如果当前节点是tail,此时表明pred可以作为tail节点
      2. 如果当前节点不是tail

        且pred是head,尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点
        且pred不是head,从CLH队列中剔除cancelledNodes

    如果当前节点的前继节点是head,那么当前节点被取消,就说明当前节点的后继节点就是head节点的后继节点了,此时作为head节点的后继节点,可以被unpark()

    private void cancelAcquire(Node node) {
        if (node == null) { 
            return; 
        }
    
        /* 找到适合的前继节点,当前节点的waitStatus赋值为CANCELLED */
        node.thread = null;
        Node pred = node.prev;
        /* 若前继节点是CANCELLED,则继续找前继节点,直至找到一个正常的前继节点赋值给node,作为node的新前继节点 */
        while (pred.waitStatus > 0) { 
            node.prev = pred = pred.prev; 
        }  
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;
        /* 特殊情况:node==tail节点,将pred作为tail节点,然后将cancelledNodes节点链从CLH队列剔除 */
        if (node == tail && compareAndSetTail(node, pred)) {  
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            /* 正常情况:则将cancelledNodes节点链从CLH队列剔除 */
            if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                    && pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {  
                /*  特殊情况:pred==head节点:尝试调用unparkSuccessor(node),尝试唤醒当前节点的后继节点 */
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
    

    3 释放独占资源

    资源的释放使用的是release()方法:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    调用tryRelease()方法释放资源:state。释放成功后,唤醒head节点的后继节点,unparkSuccessor()

    /*注意:如果当前节点的后继节点为空,或者是被取消的节点。那就从tail节点逆向遍历CLH队列,直至找到一个距离当前节点node最近,且waitStatus<=0的节点,然后唤醒该节点*/
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0); 
        }
        /* 若后继节点不符合唤醒标准,则逆向遍历CLH,直至找到一个距离当前节点node最近,且waitStatus<=0的节点 */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    1. 假设之前的t1线程执行完毕,调用release()释放资源,释放前效果图:
      image
    2. 调用unparkSuccessor()方法开始unpark()head节点的后继节点:

      1.将node节点waitStatus置为0
      image
      2.unpark()之后会唤醒t2线程,线程会到之前的acquireQueued()方法的循环之中,尝试获取锁,获取成功,执行完毕后图2:
      image

  • 相关阅读:
    安装最新版本的zabbix
    原 linux添加虚拟ip(手动vip和keepalived方式)
    zzuli OJ 1129: 第几天
    WIN内核线程池函数
    Java中利用Math.random()产生服从泊松分布的随机数
    机房重构所遇问题&quot;未能载入文件或程序集“DAL”或它的某一个依赖项。系统找不到指定的文件&quot;的解决的方法集锦
    二路插入排序
    iOS开发之地图与定位
    2015-07-30Java 错题
    bootstrap, boosting, bagging
  • 原文地址:https://www.cnblogs.com/wolfdriver/p/10478515.html
Copyright © 2011-2022 走看看