zoukankan      html  css  js  c++  java
  • 盘一盘 AQS和ReentrantLock

    AQS是个啥?

    AQS(AbstractQueuedSynchronizer)是Java并发用来构建锁和其他同步组件的基础框架。许多同步类实现都依赖于它,如常用的ReentrantLock/ReentrantReadWriterLock/CountDownLatch等
     
    AQS提供了独占(Exclusive)以及共享(Share)两种资源共享方式:
    acquire(acquireShare)/release(releaseShare)。 
    acquire:获取资源,如果当前资源满足条件,则直接返回,否则挂起当前线程,将该线程加入到队列排队。
    release:释放资源,唤醒挂起线程
     
     

    AQS队列

    AQS队列示意图

    AQS队列中的主要属性

    //  等待队列头部
    private transient volatile Node head;
    
    // 等待队列尾部
    private transient volatile Node tail;
    
    // 锁的状态(加锁成功则为1,解锁为0,重入再+1)
    private volatile int state;
    
    //  当前持有锁的线程,注意这个属性是从AbstractOwnableSynchronizer继承而来
    private transient Thread exclusiveOwnerThread;

    Node类中的主要属性

    static final class Node {
        // 标记表示节点正在共享模式中等待
        static final Node SHARED = new Node();
        // 标记表示节点正在独占模式下等待
        static final Node EXCLUSIVE = null;
    
        // 节点的等待状态 还有一个初始化状态0 不属于以下四种状态
        // 表示Node所代表的当前线程已经取消了排队,即放弃获取锁
        static final int CANCELLED =  1;
        // 当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继节点)已经被挂起了(或者马上就要被挂起了),
        // 只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行
        static final int SIGNAL    = -1;
        // 节点在等待队列中
        // 当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
        static final int CONDITION = -2;
        // 表示下一次共享式同步状态获取,将会无条件地传播下去
        static final int PROPAGATE = -3;
    
        // 节点等待状态,该字段初始化为0,
        volatile int waitStatus;
    
            // 当前节点的前置节点
        volatile Node prev;
    
        // 当前节点的后置节点
        volatile Node next;
    
            // 在此节点上排队的线程信息
        volatile Thread thread;
    }

    ReentrantLock实现

    在引入ReentrantLock实现前,我先来科普一下 util.concurrent包的作者Doug Lea,相比较其他而言,并发包的源码阅读难度较大。脸上永远挂着谦逊腼腆笑容的Doug Lea先生使用了大量相对复杂的逻辑判断,比如一个判断条件中执行多个或且方法,让你很难跟上他的节奏,很难揣摩他的设计思想。小声逼逼,还不是我太菜了,留下来没有技术的泪水。

    继承关系图

    ReentrantLock是Lock接口的一个实现类,是一种可重入的独占锁。
    ReentrantLock内部通过内部类实现了AQS框架(AbstractQueuedSynchronizer)的API来实现独占锁的功能。

    主要属性

    private final Sync sync;
    
    // 公平锁内部是FairSync,非公平锁内部是NonfairSync。
    // 两者都通过继承 Sync间接继承自AbstractQueuedSynchronizer这个抽象类
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        
        // 加锁
        abstract void lock();
    
        // 尝试获取锁
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
        // 尝试释放锁
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }

    构造方法

    //默认创建一个非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    //传入true创建公平锁,false非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    ReentrantLock公平锁

    我们以公平锁为例对其中重要方法源码分析

    // 继承了 Sync,从而间接继承了 AbstractQueuedSynchronizer这个抽象类
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
           // 上锁
        final void lock() {
            //调用 AQS 中 acquire方法
            acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    // CAS操作设置 state
                    // 设置当前线程为拥有锁的线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
    
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    acquire方法源码分析

    public final void acquire(int arg) {
        // tryAcquire(arg)尝试加锁,如果加锁失败则会调用acquireQueued方法加入队列去排队,如果加锁成功则不会调用
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    acquire方法干了这么几件事情
    1、tryAcquire() 尝试获取资源,如果成功则直接返回;
    2、addWaiter() 将该线程加入等待队列, 更新AQS队列链信息
    3、acquireQueued() 使线程在等待队列中获取资源,直到获取资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
    4、selfInterrupt() 自我中断,如果线程在等待过程中被中断过,它是不响应的。只是获取资源后再将中断补上。
     

    tryAcquire方法

    protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
        final Thread current = Thread.currentThread();
        // 获取lock对象的上锁状态,如果锁是自由状态则=0,如果被上锁则为1,大于1表示重入
        int c = getState();
    
        // c=0 代表没人占用锁,当前线程可以直接获取锁资源执行
        if (c == 0) {
            // 下面介绍hasQueuedPredecessors()方法,判断自己是否需要排队
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // CAS操作设置 state
                // 设置当前线程为拥有锁的线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
    
        // 非重入锁直接返回false,加锁失败
        else if (current == getExclusiveOwnerThread()) {
            // 若为重入锁, state 加1 (acquires)
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    hasQueuedPredecessors方法

    public final boolean hasQueuedPredecessors() {
        // 获取队列头、尾节点信息
        Node t = tail; 
        Node h = head;
        Node s;
        // h != t 有几种情况
        // 1、队列尚未初始化完成,第一个线程获取锁资源,
        //    此时h和t都是null, h != t返回fasle初始化队列
        // 2、队列已经被初始化了,其他的线程尝试获取资源,
        //    此时头尾节点不相同,h!=t返回true,
        //    继续判断s.thread != Thread.currentThread() 当前来参与竞争锁的线程和第一个排队的线程是同一个线程,则需要排队。
        // 3、队列已经被初始化了,但是由于锁释放的原因导致队列里面只有一个数据
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

    addWaiter方法

    private Node addWaiter(Node mode) {
        // AQS队列中的元素类型为Node,需要把当前线程封装成为一个Node对象
        Node node = new Node(Thread.currentThread(), mode);
    
        // tail为队尾,赋值给pred
        Node pred = tai
        // 判断pred是否为空,其实就是判断队尾是否有节点,其实只要队列被初始化了队尾肯定不为空,
        if (pred != null) {
            // 拼装node队列链的过程
            // 直接把当前线程封装的node的上一个节点设置成为pred即原来的队尾
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // pred的下一个节点设置为当node
                pred.next = node;
                return node;
            }
        }
    
        // 拼接aqs队列链
        enq(node);
        return node;
    }
    
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    acquireQueued方法

    final boolean acquireQueued(final Node node, int arg) {
        // 标记是否成功拿到资源
        boolean failed = true;
        try {
            // 标记等待过程中是否被中断过
            boolean interrupted = false;
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                // 判断自己是否为队列中的第二个节点
                // 成为队列中第二个节点才有资格获取资源
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    // 返回等待过程中是否被中断过
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    公平锁和非公平锁的主要区别

    为了方便对比,在这里列举了两种锁的上锁过程源码,注意红色标识片段

    // 公平锁上锁过程
    final void lock() {
        //调用 AQS 中 acquire方法
        acquire(1);
    }  
    // 非公平锁上锁过程
    final void lock() {
        // 尝试获取锁,加锁不成功则排队。排队之前仅有的一次插队机会。
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    总结

    1、如果第一个线程尝试获取资源时,此时和AQS队列无关,线程直接持有锁。并且不会初始化队列,如果接下来的线程都是交替执行,那么和AQS队列永远无关,均为线程直接持有锁。
    2、在线程发生资源竞争的情况下,才会初始化AQS队列,AQS队列的头部永远是一个虚拟的Thread为NULL的node。
    3、未能获取到资源的线程将会处于park状态,此时只有队列中第二个node等待被唤醒,尝试去获取资源。其他node并不去竞争资源,这也是AQS队列的精髓所在,减少了CPU的占用。
    4、公平锁的上锁是必须判断自己是不是需要排队;而非公平锁是直接进行CAS修改计数器看能不能加锁成功;如果加锁不成功则乖乖排队(调用acquire);所以不管公平还是不公平;只要进到了AQS队列当中那么他就会排队;一朝排队;永远排队!
  • 相关阅读:
    mongo 查询某个字段的值不为空列表!
    pdftohtml的使用
    Activiti数据库表结构(表详细版)
    ElasticSearch在linux上安装部署
    构建Spring Web应用程序—关于spring中的validate注解后台校验的解析
    构建Spring Web应用程序—SpringMVC详解
    高级装配—运行时注入
    高级装配—bean的作用域
    高级装配—条件化的Bean
    高级装配—配置profile bean
  • 原文地址:https://www.cnblogs.com/LemonFive/p/11377439.html
Copyright © 2011-2022 走看看