zoukankan      html  css  js  c++  java
  • AQS详解

    AQS详解

    AQS:提供原子式管理同步状态,阻塞和唤醒线程功能以及队列模型。

    ReentrantLock

    特性

    • 为可重入锁,一个线程能够对一个临界资源重复加锁。
    • 通过AQS实现锁机制。
    • 支持响应中断,超时和尝试获取锁。
    • 必须使用unlock()释放锁。
    • 有公平锁和非公平锁。
    • 可以关联多个条件队列。

    加锁

    非公平锁:

    1. 若通过AQS设置变量state(同步状态)成功,即获取锁成功,则将当前下线程设置为独占线程。
    2. 若获取失败,则进入acquire()方法进行后续处理。

    公平锁:

    1. 进入acquire()方法进行后续处理。

    AQS

    核心思想

    • 若请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,并将共享资源设置为锁定状态。
    • 若共享资源被占用,则需要阻塞等待唤醒机制保证锁的分配。

    实现

    • 通过CLH队列的变体:FIFO双向队列实现的。
    • 每个请求资源的线程被包装成一个节点来实现锁的分配。
    • 通过volatileint类型的成员变量state表示同步状态。
    • 通过FIFO队列完成资源获取的排队工作。
    • 通过CAS完成对state的修改。

    节点Node

    方法和属性

    方法或属性 含义
    waitStatus 当前节点在队列中的状态
    thread 节点对应的线程
    prev 前驱指针
    next 后继指针
    predecessor 返回前驱节点
    nextWaiter 指向下一个CONDITION状态节点

    waitStatus状态:

    • 0:Node初始化后的默认值。
    • CANCELLED:为1,线程获取锁的请求已被取消。
    • CONDITION:为-2,节点在等待队列中,等待唤醒。
    • PROPAGATE:为-3,线程处于SHARED状态下使用。
    • SIGNAL:为-1,线程已准备,等待资源释放。

    线程的锁模式:

    • SHARED:共享模式等待锁。
    • EXCLUSIVE:独占模式等待锁。

    state与锁模式

    独占模式:

    • 初始化state=0
    • 试图获取同步状态:若state为0,则设置为1,获取锁,进行后续操作。
    • state非0,则当前线程阻塞。

    共享模式:

    • 初始化state=n(表示最多n个线程并发)
    • 试图获取同步状态:若state大于0,则使用CAS对state进行自减操作,进行后续操作。
    • 若不大于0,则当前线程阻塞。

    重写AQS实现的方法

    方法 描述
    boolean isHeldExclusively() 该线程是否正在独占资源
    boolean tryAcquire(int arg) 独占试图获取锁,arg为获取锁的次数,获取成功则返回true
    boolean tryRelease(int arg) 独占方式试图释放锁,arg为释放的锁的次数,成功则返回true
    int tryAcquireShared(int arg) 共享方式获取锁,负数则失败,0表示成功,但没有剩余可用资源,正数表示成功,有剩余资源
    boolean tryReleaseShared(int arg) 共享方式释放锁,允许唤醒后续等待节点并返回true

    注:
    一般为独占或共享方式,也可同时实现独占和共享(ReentrantReadWriteLock)

    ReentrantLock非公平锁lock()方法执行流程:

    lock

    加锁流程:

    1. 通过ReentrantLock的加锁方法lock()进行加锁。
    2. 调用内部类Synclock()方法,由于是抽象方法,则由ReentrantLock初始化选择公平锁和非公平锁,执行相关内部类的lock()方法,从而执行AQS的acquire()方法。
    3. AQS的acquire()方法会执行tryAcquire()方法,由于tryAcquire()需要自定义,则会执行ReentrantLock中的tryAcquire()方法,根据是公平锁还是非公平锁,执行不同的tryAcquire()
    4. tryAcquire()为获取锁,若获取失败,则执行AQS后续策略。

    解锁流程:

    1. 通过ReentrantLock的解锁方法unlock()进行解锁。
    2. unlock()调用内部类Syncrelease()方法。
    3. release()会调用tryRelease()方法,其由ReentrantLock中的Sync实现。
    4. 释放成功,其余由AQS进行处理。

    从ReentrantLock到AQS

    ReentrantLock中的lock()

    // ReentrantLock
    final void lock() {
        if (compareAndSetState(0, 1)) // CAS设置state
            setExclusiveOwnerThread(Thread.currentThread()); // 设置成功则当前线程独占资源
        else
            acquire(1); // 设置失败则后续处理
    }
    

    ReentrantLock中,lock()的实现逻辑为:

    • 试图CAS设置状态为1。
      • 若设置成功,则设置当前线程独占资源。
      • 若设置失败,则通过acquire()方法进一步处理。

    acquire()方法实现:

    • 试图获取访问tryAcquire(),若成功,则获取锁成功。
    • 若失败,则加入等待队列。
    // AbstractQueuedSynchronizer
    public final void acquire(int arg) {
        // 先尝试获取,获取成功则独占资源,否则进入等待队列
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    线程加入等待队列

    通过addWaiter(Node.EXCLUSIVE)将当前线程加入等待队列。

    加入流程:

    1. 由当前线程构造一个节点。
    2. 若等待队列不为空时,则设置当前节点为队列尾节点。
    3. 若队列为空或者失败时,则重复尝试将该节点加入到队列成为尾节点。
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); // 当前线程构造一个节点
        if (pred != null) { // 设置当前节点尾队列的尾节点
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node); // 节点加入队列失败,则循环尝试加入队列,直到成功
        return node;
    }
    
    // 队列为空或加入队列失败,则循环尝试加入队列,直到成功
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 若队列为空,则当前节点设为头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t; // 若队列非空,则当前节点加入队列为尾节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    注:

    • 若队列为空,则构造一个空节点作为头节点,然后将当前线程构造的节点加入作为尾节点。
    • 判断等待队列是否有有效节点:
      • 若头节点等于尾节点,则返回false(没有有效节点,当前节点可以争夺共享资源).
      • 若不等于,则判断头节点的下一个节点是否不为null并且是否等于当前节点,若两个条件均满足,则返回false.
      • 否则返回true(队列中有有效节点,当前线程进入队列等待)

    线程出队列

    1. 获取当前的前一个节点.
    2. 若前一个节点为头节点head并且当前节点获取锁成功,则将当前节点设为head结点,当前线程执行后续操作.
    3. 若前一个节点非头节点head或者当前结点获取锁失败,则阻塞当前线程,等待被唤醒.
    4. 被唤醒后,循环重复上面步骤,直到成功获取锁.
    5. 若线程被中断,则跳出循环,检查是否获取成功.成功则执行后续代码,否则取消当前线程的获取请求.
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); // 获取前一个节点
                if (p == head && tryAcquire(arg)) { // 若前一个节点为head,则试图获取,若获取成功,则当前线程设为head
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 若前一个节点非head或获取失败,则阻塞当前线程,直到其被唤醒
                    interrupted = true;
            }
        } finally {
            if (failed) // 若获取失败,取消当前线程的请求
                cancelAcquire(node);
        }
    }
    

    获取失败后进行阻塞检查:

    • 若前一个节点处于唤醒状态,则当前线程被阻塞.
    • 若前一个节点不处于阻塞状态,则向前查找节点.
      • 若找到一个处于唤醒状态的节点,则当前线程阻塞.
      • 若没有唤醒状态的结点,则设置当前结点唤醒,当前线程将循环试图获取锁.
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 当前线程的前一个线程处于唤醒状态,当前线程阻塞
            return true;
        if (ws > 0) { // 前一个线程的请求被取消,则删除向前查找,将所有取消的线程从队列删除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else { // 当前线程设置为唤醒状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    // 阻塞当前线程,返回当前线程的中断状态
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    线程移出队列的流程:

    移出步骤1
    移出步骤2

    取消线程请求

    流程:

    1. 获取当前节点的前驱节点.
    2. 若前驱节点的状态为CANCELLED,则一直向前遍历,找到第一个非CANCELED节点,设置当前节点为CANCELLED.
      • 若当前节点为尾节点:则设置前驱结点指向的下一个节点指针为null.
      • 若当前节点为head节点的后继结点,则设置当前节点的下一个节点指针为null.
      • 若当前节点非尾节点并且非head节点的下一个节点,则设置当前结点的下一个结点指向当前结点的下一个节点(从而从前向后遍历时跳过被CANCELLED的结点)

    为什么只对next指针操作,而不对prev指针操作?
    修改prev指针,可能导致prev指向一个已经被移出队列的节点,存在安全问题.
    shouldParkAfterFailedAcquire()方法内,会处理prev指针,使得CANCELLED的节点从队列中删除.

    ReentrantLock的unlock()

    解锁流程:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread()) // 若解锁的线程非占有资源的线程,则抛出异常
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 若持有的线程全部释放,则占有资源的线程设为null,更新状态state
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    // 当前线程释放锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0) // 若队列非空或者当前线程不处于唤醒状态,唤醒当前线程后面的一个线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    private void unparkSuccessor(Node node) {
        // node为当前线程的后一个等待线程
        int ws = node.waitStatus;
        if (ws < 0) // 若后一个等待线程未被取消,则设置其状态为可获取锁
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) { // 若当前线程下一个线程为null或者被取消
            s = null;
            // 从后向前查找第一个没有被取消的等待线程,将其唤醒
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    为什么从后往前查找可唤醒的线程?
    新节点入队列时,是先将其prev指针指向队尾节点,在将尾节点的next指向新节点,若从前向后查找可唤醒的线程,在这两个步骤之间发生的查找线程操作会忽略新节点.
    同时,在产生CANCELLED节点时,也是先断开next指针,而prev指针.只有从后向前遍历才能遍历完全部的节点.

    获取锁之后还要进行中断响应:

    • 线程在等待资源后被唤醒,唤醒后不断尝试获得锁,直到抢到锁为止.整个流程不会响应中断,直到抢到锁后检查是否被中断,若被中断,则补充一次中断.

    小结

    1. 线程获取锁失败后怎么样?
      在等待队列中等待,并继续尝试获得锁.
    2. 排队队列的数据类型?
      CLH变体的FIFO双向队列.
    3. 排队的线程什么时候有机会获得锁?
      当前面的线程释放锁时,会唤醒后面等待的线程.
    4. 若等待的线程一直无法获得锁,需要一直等待吗?
      线程对应的节点被设为CANCELLED状态,并被清除出队列.
    5. lock()方法通过acquire()方法进行加锁,如何进行加锁?
      acquire()调用tryAcquire()进行加锁,具体由自定义同步器实现.

    AQS应用

    核心:

    1. state初始化为0,表示没有任何线程持有锁.
    2. 当有线程持有锁时,state在原来值上加1,同一个线程多个获得锁,则多次加1.
    3. 如线程释放锁,则state减1,直到为0,表示线程释放锁.
    同步工具 特点
    ReentantLock 使用state保存锁重复持有的次数,多次获得锁时其值递增
    Semaphore 使用AQS同步状态保存信号量的当前计数,tryRelease增加计数,acquireShared减少计数
    CountDownLatch 通过AQS同步状态计数,计数为0,所有的acquire操作才可以通过
    ReentrantReadWriteLock AQS同步状态的16位保存写锁持有次数,剩下16位用于保存读锁持有次数
    ThreadPoolExecutor 利用AQS同步状态实现独占线程变量设置

    参考:

  • 相关阅读:
    使用keepalived实现双机热备
    MYSQL ERROR CODE 错误编号的意义
    Mysql slow query log
    eclipse svn 分支合并到主干
    Timer的schedule和scheduleAtFixedRate方法的区别解析
    Java内部类引用外部类中的局部变量为何必须是final问题解析
    nginx中有关命令和日志切割,配置文件加载的详细阐述
    流媒体中ffmpeg 命令的使用
    windows下搭建nginx服务器及实现nginx支持https配置流程
    mysql 中sql语句的执行顺序
  • 原文地址:https://www.cnblogs.com/truestoriesavici01/p/13213978.html
Copyright © 2011-2022 走看看