zoukankan      html  css  js  c++  java
  • AQS源码解析

    在并发编程中,ReentrantLock作为一个非常重要同步组件,通过AQS同步器,可以构建锁或其他同步组件。
    本文将以ReentrantLock源码,分析AQS工作原理。

    一、简介

    AQS(AbstractQueuedSynchronizer)使用一个int成员变量state表示同步状态(state为0,表示当前资源没有被占用,state>0,表示当前资源已经被占用),结合内置的一个同步队列(FIFO)完成资源获取线程的等待排队工作,如果当前线程没有获取到锁,则将线程封装成一个Node节点,加入到同步队列,等待唤醒,通过自旋的方式尝试获得锁。

    Node节点组成

    • waitStatus:等待状态
    • Node prev:前驱节点
    • Node next:后继节点
    • Thread thread:获取同步状态的线程
    • Node nextWaiter:等待队列中的后继节点

    Node节点状态:waitStatus

    1. INITIAL=0:初始状态
    2. SIGNAL=-1:后继节点的线程处于等待状态,当前节点如果释放了同步状态,将通知后继节点
    3. CONDITION=-2:节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将从等待队列转移到同步队列中,加入到对同步状态的获取中
    4. PROPAGATE=-3:表示下一次共享式同步状态获取将会无条件的被传播下去
    5. CANNELED=1:在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化

    二、源码分析

    2.1 加锁

    首先从ReentrantLock的lock方法开始

        public void lock() {
            sync.lock();
        }
    

    可以发现ReentrantLock实际调用的sync对象的lock方法,sync对象是ReentrantLock内部的一个静态内部类

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	// ...
    }
    

    sync类有两个是实现:FairSync、NonfairSync,分别对应公平锁和非公平锁。
    先从非公平锁的角度分析源码,下文用sync代替非公平锁NonfairSync

            final void lock() {
            		// 尝试通过CAS获取锁
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    sync类的lock方法先尝试通过CAS修改state的状态位1,如果修改成功,表示当前线程获取到锁。否则调用acquire方法尝试加锁。

    		// AQS实现的方法,子类不可重写
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    acquire方法内部再次尝试通过tryAcquire获取锁

    				// tryAcquire需要子类实现,找到NonfairSync的实现
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                		// 当前没有锁,通过CAS尝试加锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                		// 获得到锁的线程是当前线程(可重入)
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    从这个方法可以看出,如果当前线程已经拥有锁,则将状态+acquires(获取锁的次数,需要释放相同的次数,可重入)直接返回加锁成功。如果没有获取到锁,则返回false

    回到acquire方法,假设tryAcquire没有加锁成功,继续跟踪addWaiter,通过方法名称可以猜出来,这个方法是将当前线程封装成一个Node

    	// 同步队列头指针,默认为null	
    	private transient volatile Node head;
    	// 同步队列尾指针,默认为null
    	private transient volatile Node tail;
    		
    	// mode=Node.EXCLUSIVE(独占式)
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    首先将当前线程封装成一个Node对象,如果当前队列是空的情况下,head和tail默认是都null,if (pred != null) 条件不成立,继续跟踪enq方法

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    第一次循环,t为null,if (t == null) 条件成立,通过CAS设置head节点为一个空的Node节点(哨兵节点),然后让tail和head同时指向哨兵节点
    AQS

    哨兵节点表示当前已经获得锁,正在执行业务逻辑的线程

    第二次循环,t不为null(t指向哨兵节点),node(当前线程封装的对象)的前驱指针指向t,即第一次循环创建的哨兵节点,通过CAS让tail指向node节点,然后将哨兵节点的后继指针指向node节点,结束循环
    AQS
    通过自旋的方式初始化了哨兵节点,并且将当前线程Node节点添加到同步队列中,假设此时有多个线程抢占锁,最后都会进入同步队列等待
    AQS
    继续跟踪acquireQueued方法

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    首先获取当前线程Node节点的前驱节点p,判断前驱节点p是否是头节点。通过上面的分析可知,p是头结点,然后通过通过tryAcquire方法尝试获取锁。

    • 如果加锁成功,则将当前节点设置为头结点,并将之前创建的哨兵节点的next指针指向null,即断开哨兵节点与当前线程Node节点之间的关联,此时当前线程已经获取到锁。
    • 如果加锁失败,则调用shouldParkAfterFailedAcquire方法
    	// pred=头结点(哨兵节点),node=头结点的后继节点
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    头结点当前等待状态是初始状态,所以进入else代码块,设置头结点状态为SIGNAL(-1),继续进行下一次循环。
    假设下一次循环依旧没有加锁成功,再次进入shouldParkAfterFailedAcquire方法,当前头结点状态已经变成SIGNAL,返回true,继续调用parkAndCheckInterrupt方法

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    可以看出,当前线程被阻塞,进入等待状态。

    至此已经完成了线程加锁失败,进入同步队列排队等待源码分析,下面继续看等待线程如何被唤醒。

    2.2 释放锁

    排队线程的唤醒必须要由其他线程释放锁触发,所以我们继续跟踪ReentrantLock的unlock方法

    		// ReentrantLock方法
        public void unlock() {
            sync.release(1);
        }
        // ReentrantLock.Sync方法
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    ReentrantLock通过Sync的release方法释放锁,release调用了tryRelease方法,跟踪tryRelease方法源码

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    判断当前线程是否是锁的独占线程,如果不是,抛出异常。所以我们使用ReentrantLock的时候,必须要加锁成功,才能执行unlock方法。
    如果state-releases的结果等于0,说明已经释放锁成功,清空当前锁的独占线程,重置状态为0。
    这里必须要注意,我们加锁几次,就要释放几次。

    继续回到Sync.release方法,释放成功之后,如果头节点h不等于空,并且头结点h的等待状态不是0,调用unparkSuccessor方法(通过上面的分析可知,头结点的状态是-1,表示头结点的后继节点处于等待状态)

    	// node=头结点h
        private void unparkSuccessor(Node node) {
        	// node=head
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    		// s=Node A
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
            	// 线程A等待超时或者被其他线程中断,waitStatus > 0,这时候需要唤醒线程B
                s = null;
                // tail=Node B
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    头结点h的状态等于SIGNAL(-1),CAS设置头结点状态为0,节点s(指向线程A对应的Node节点)不等于空,并且节点s的状态为初始状态(0),不会进入if条件,通过LockSupport.unpark唤醒线程A

    如果线程A等待超时或者被中断的情况

    如果节点s的状态大于0(即线程A等待超时或者被中断),执行for循环,此时tail指向线程B对应的Node节点。

    1. 第一次循环,t=tail,t指向Node B,并判断Node B的等待状态,Node B是初始状态(0),t赋值给s,s指向Node B
    2. 第二次循环,t=Node A(t = t.prev),Node A由于等待超时或者被中断,所以等待状态waitStatus等于1,if (t.waitStatus <= 0)不成立
    3. 第三次循环,t=head(t = t.prev),for循环条件不成立,结束循环

    至此,s指向Node B,继续往下执行,通过LockSupport.unpark唤醒线程B。

    2.3 公平锁与非公平锁

    ReentrantLock通过构造方法可以指定锁的类型,即非公平锁与公平锁,默认是非公平锁。

    什么是非公平锁与公平锁呢?

    • 非公平锁:线程获得锁的顺序与加锁顺序无关,性能更好,但是可能会导致线程“饥饿”,即线程一直无法获得锁
    • 公平锁:先尝试加锁的线程会优先获得到锁,可以避免线程“饥饿”的情况

    上面已经完成了非公平锁的源码分析,接下来继续跟踪源码看如何实现公平锁。

        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    在ReentrantLock类内部有三个内部类,Sync、NonfairSync、FairSync。Sync是一个抽象类,NonfairSync和FairSync均继承了Sync类,分别对应非公平锁与公平锁的实现。

    接下来看看非公平锁与公平锁的lock方法差异
    NonfairSync

            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    FairSync

            final void lock() {
                acquire(1);
            }
    

    可以看出,非公平锁调用lock方法的时候,会先尝试通过CAS修改同步状态,如果当前锁空闲,当前线程直接可以获得到锁。而公平锁直接调用acquire方法。如果当前锁已被占用,那么非公平锁和公平锁都会调用acquire方法。

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire需要AQS子类实现,继续跟踪NonfairSync和FairSync的实现
    NonfairSync

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    FairSync

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    可以看出,非公平锁再次尝试加锁,而对于公平锁来说,会先调用hasQueuedPredecessors检查队列中是否存在等待线程

        public final boolean hasQueuedPredecessors() {
            Node t = tail;
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    如果同步队列中已经存在等待线程,则不会尝试加锁,返回false,回到acquire方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire获得锁失败,则将当前线程封装成Node对象添加到同步队列尾部。
    由此可见,公平锁是按照进入同步队列的顺序获得锁,而非公平锁并不按照入队顺序,哨兵节点的下一个节点被唤醒时,可能同时存在其他线程与当前线程共同抢占锁,即同步队列中的排队线程被唤醒时,还是需要与其他未进入队列的线程竞争。

  • 相关阅读:
    cnblog项目--20190309
    django js引入失效问题
    Python老男孩 day16 函数(六) 匿名函数
    Python老男孩 day16 函数(五) 函数的作用域
    Python老男孩 day15 函数(四) 递归
    Python老男孩 day15 函数(三) 前向引用之'函数即变量'
    Python老男孩 day15 函数(二) 局部变量与全局变量
    Python老男孩 day14 函数(一)
    Python老男孩 day14 字符串格式化
    Python老男孩 day14 集合
  • 原文地址:https://www.cnblogs.com/thisismartin/p/14117380.html
Copyright © 2011-2022 走看看