zoukankan      html  css  js  c++  java
  • AQS源码解析

    在并发编程中,ReentrantLock作为一个非常重要同步组件,通过AQS同步器,可以构建锁或其他同步组件。
    本文将以ReentrantLock源码,分析AQS工作原理。

    一、简介

    AQS(AbstractQueuedSynchronizer)使用一个int成员变量state表示同步状态(state为0,表示当前资源没有被占用,state>0,表示当前资源已经被占用),结合内置的一个同步队列(FIFO)完成资源获取线程的等待排队工作,如果当前线程没有获取到锁,则将线程封装成一个Node节点,加入到同步队列,等待唤醒,通过自旋的方式尝试获得锁。

    Node节点组成

    • waitStatus:等待状态
    • Node prev:前驱节点
    • Node next:后继节点
    • Thread thread:获取同步状态的线程
    • Node nextWaiter:等待队列中的后继节点

    Node节点状态:waitStatus

    1. INITIAL=0:初始状态
    2. SIGNAL=-1:后继节点的线程处于等待状态,当前节点如果释放了同步状态,将通知后继节点
    3. CONDITION=-2:节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal方法后,该节点将从等待队列转移到同步队列中,加入到对同步状态的获取中
    4. PROPAGATE=-3:表示下一次共享式同步状态获取将会无条件的被传播下去
    5. CANNELED=1:在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消等待,节点进入该状态将不会变化

    二、源码分析

    2.1 加锁

    首先从ReentrantLock的lock方法开始

        public void lock() {
            sync.lock();
        }
    

    可以发现ReentrantLock实际调用的sync对象的lock方法,sync对象是ReentrantLock内部的一个静态内部类

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	// ...
    }
    

    sync类有两个是实现:FairSync、NonfairSync,分别对应公平锁和非公平锁。
    先从非公平锁的角度分析源码,下文用sync代替非公平锁NonfairSync

            final void lock() {
            		// 尝试通过CAS获取锁
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    sync类的lock方法先尝试通过CAS修改state的状态位1,如果修改成功,表示当前线程获取到锁。否则调用acquire方法尝试加锁。

    		// AQS实现的方法,子类不可重写
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    acquire方法内部再次尝试通过tryAcquire获取锁

    				// tryAcquire需要子类实现,找到NonfairSync的实现
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                		// 当前没有锁,通过CAS尝试加锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                		// 获得到锁的线程是当前线程(可重入)
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    从这个方法可以看出,如果当前线程已经拥有锁,则将状态+acquires(获取锁的次数,需要释放相同的次数,可重入)直接返回加锁成功。如果没有获取到锁,则返回false

    回到acquire方法,假设tryAcquire没有加锁成功,继续跟踪addWaiter,通过方法名称可以猜出来,这个方法是将当前线程封装成一个Node

    	// 同步队列头指针,默认为null	
    	private transient volatile Node head;
    	// 同步队列尾指针,默认为null
    	private transient volatile Node tail;
    		
    	// mode=Node.EXCLUSIVE(独占式)
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    首先将当前线程封装成一个Node对象,如果当前队列是空的情况下,head和tail默认是都null,if (pred != null) 条件不成立,继续跟踪enq方法

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    第一次循环,t为null,if (t == null) 条件成立,通过CAS设置head节点为一个空的Node节点(哨兵节点),然后让tail和head同时指向哨兵节点
    AQS

    哨兵节点表示当前已经获得锁,正在执行业务逻辑的线程

    第二次循环,t不为null(t指向哨兵节点),node(当前线程封装的对象)的前驱指针指向t,即第一次循环创建的哨兵节点,通过CAS让tail指向node节点,然后将哨兵节点的后继指针指向node节点,结束循环
    AQS
    通过自旋的方式初始化了哨兵节点,并且将当前线程Node节点添加到同步队列中,假设此时有多个线程抢占锁,最后都会进入同步队列等待
    AQS
    继续跟踪acquireQueued方法

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    首先获取当前线程Node节点的前驱节点p,判断前驱节点p是否是头节点。通过上面的分析可知,p是头结点,然后通过通过tryAcquire方法尝试获取锁。

    • 如果加锁成功,则将当前节点设置为头结点,并将之前创建的哨兵节点的next指针指向null,即断开哨兵节点与当前线程Node节点之间的关联,此时当前线程已经获取到锁。
    • 如果加锁失败,则调用shouldParkAfterFailedAcquire方法
    	// pred=头结点(哨兵节点),node=头结点的后继节点
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    头结点当前等待状态是初始状态,所以进入else代码块,设置头结点状态为SIGNAL(-1),继续进行下一次循环。
    假设下一次循环依旧没有加锁成功,再次进入shouldParkAfterFailedAcquire方法,当前头结点状态已经变成SIGNAL,返回true,继续调用parkAndCheckInterrupt方法

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    可以看出,当前线程被阻塞,进入等待状态。

    至此已经完成了线程加锁失败,进入同步队列排队等待源码分析,下面继续看等待线程如何被唤醒。

    2.2 释放锁

    排队线程的唤醒必须要由其他线程释放锁触发,所以我们继续跟踪ReentrantLock的unlock方法

    		// ReentrantLock方法
        public void unlock() {
            sync.release(1);
        }
        // ReentrantLock.Sync方法
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    ReentrantLock通过Sync的release方法释放锁,release调用了tryRelease方法,跟踪tryRelease方法源码

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    判断当前线程是否是锁的独占线程,如果不是,抛出异常。所以我们使用ReentrantLock的时候,必须要加锁成功,才能执行unlock方法。
    如果state-releases的结果等于0,说明已经释放锁成功,清空当前锁的独占线程,重置状态为0。
    这里必须要注意,我们加锁几次,就要释放几次。

    继续回到Sync.release方法,释放成功之后,如果头节点h不等于空,并且头结点h的等待状态不是0,调用unparkSuccessor方法(通过上面的分析可知,头结点的状态是-1,表示头结点的后继节点处于等待状态)

    	// node=头结点h
        private void unparkSuccessor(Node node) {
        	// node=head
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    		// s=Node A
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
            	// 线程A等待超时或者被其他线程中断,waitStatus > 0,这时候需要唤醒线程B
                s = null;
                // tail=Node B
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    头结点h的状态等于SIGNAL(-1),CAS设置头结点状态为0,节点s(指向线程A对应的Node节点)不等于空,并且节点s的状态为初始状态(0),不会进入if条件,通过LockSupport.unpark唤醒线程A

    如果线程A等待超时或者被中断的情况

    如果节点s的状态大于0(即线程A等待超时或者被中断),执行for循环,此时tail指向线程B对应的Node节点。

    1. 第一次循环,t=tail,t指向Node B,并判断Node B的等待状态,Node B是初始状态(0),t赋值给s,s指向Node B
    2. 第二次循环,t=Node A(t = t.prev),Node A由于等待超时或者被中断,所以等待状态waitStatus等于1,if (t.waitStatus <= 0)不成立
    3. 第三次循环,t=head(t = t.prev),for循环条件不成立,结束循环

    至此,s指向Node B,继续往下执行,通过LockSupport.unpark唤醒线程B。

    2.3 公平锁与非公平锁

    ReentrantLock通过构造方法可以指定锁的类型,即非公平锁与公平锁,默认是非公平锁。

    什么是非公平锁与公平锁呢?

    • 非公平锁:线程获得锁的顺序与加锁顺序无关,性能更好,但是可能会导致线程“饥饿”,即线程一直无法获得锁
    • 公平锁:先尝试加锁的线程会优先获得到锁,可以避免线程“饥饿”的情况

    上面已经完成了非公平锁的源码分析,接下来继续跟踪源码看如何实现公平锁。

        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    在ReentrantLock类内部有三个内部类,Sync、NonfairSync、FairSync。Sync是一个抽象类,NonfairSync和FairSync均继承了Sync类,分别对应非公平锁与公平锁的实现。

    接下来看看非公平锁与公平锁的lock方法差异
    NonfairSync

            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    FairSync

            final void lock() {
                acquire(1);
            }
    

    可以看出,非公平锁调用lock方法的时候,会先尝试通过CAS修改同步状态,如果当前锁空闲,当前线程直接可以获得到锁。而公平锁直接调用acquire方法。如果当前锁已被占用,那么非公平锁和公平锁都会调用acquire方法。

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire需要AQS子类实现,继续跟踪NonfairSync和FairSync的实现
    NonfairSync

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    FairSync

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    可以看出,非公平锁再次尝试加锁,而对于公平锁来说,会先调用hasQueuedPredecessors检查队列中是否存在等待线程

        public final boolean hasQueuedPredecessors() {
            Node t = tail;
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    如果同步队列中已经存在等待线程,则不会尝试加锁,返回false,回到acquire方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire获得锁失败,则将当前线程封装成Node对象添加到同步队列尾部。
    由此可见,公平锁是按照进入同步队列的顺序获得锁,而非公平锁并不按照入队顺序,哨兵节点的下一个节点被唤醒时,可能同时存在其他线程与当前线程共同抢占锁,即同步队列中的排队线程被唤醒时,还是需要与其他未进入队列的线程竞争。

  • 相关阅读:
    WPF入门(一):简单的演示
    代码的演化DI(理解依赖注入di,控制反转ioc)
    WPF入门(三):简单绑定 绑定到页面元素
    WPF入门(四):简单绑定 静态资源绑定
    WPF入门(六)样式Style
    WPF入门(八)布局(layout) port 2
    js select onchange
    js this指向
    js 两个日期之间有多少个星期几
    js table的所有td 按行合并
  • 原文地址:https://www.cnblogs.com/thisismarc/p/14117380.html
Copyright © 2011-2022 走看看