zoukankan      html  css  js  c++  java
  • Java并发(5)- ReentrantLock与AQS

    引言

    synchronized未优化之前,我们在编码中使用最多的同步工具类应该是ReentrantLock类,ReentrantLock拥有优化后synchronized关键字的性能,又提供了更多的灵活性。相比synchronized,他在功能上更加强大,具有等待可中断,公平锁以及绑定多个条件等synchronized不具备的功能,是我们开发过程中必须要重点掌握的一个关键并发类。

    ReentrantLock在JDK并发包中举足轻重,不仅是因为他本身的使用频度,同时他也为大量JDK并发包中的并发类提供底层支持,包括CopyOnWriteArrayLitCyclicBarrierLinkedBlockingDeque等等。既然ReentrantLock如此重要,那么了解他的底层实现原理对我们在不同场景下灵活使用ReentrantLock以及查找各种并发问题就很关键。这篇文章就带领大家一步步剖析ReentrantLock底层的实现逻辑,了解实现逻辑之后又应该怎么更好的使用ReentrantLock

    ReentrantLock与AbstractQueuedSynchronizer的关系

    在使用ReentrantLock类时,第一步就是对他进行实例化,也就是使用new ReentrantLock(),来看看他的实例化的源码:

    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    在代码中可以看到,ReentrantLock提供了2个实例化方法,未带参数的实例化方法默认用NonfairSync()初始化了sync字段,带参数的实例化方法通过参数区用NonfairSync()FairSync()初始化sync字段。

    通过名字看出也就是我们常用的非公平锁与公平锁的实现,公平锁需要通过排队FIFO的方式来获取锁,非公平锁也就是说可以插队,默认情况下ReentrantLock会使用非公平锁的实现。那么是sync字段的实现逻辑是什么呢?看下sync的代码:

    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {......}
    
    static final class NonfairSync extends Sync {......}
    
    static final class FairSync extends Sync {......}
    

    到这里就发现了AbstractQueuedSynchronizer类,公平锁和非公平锁其实都是在AbstractQueuedSynchronizer的基础上实现的,也就是AQS。AQS提供了ReentrantLock实现的基础。

    ReentrantLock的lock()方法

    分析了ReentrantLock的实例化之后,来看看他是怎么实现锁这个功能的:

    //ReentrantLock的lock方法
    public void lock() {
        sync.lock();
    }
    
    //调用了Sync中的lock抽象方法
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ......
        /**
            * Performs {@link Lock#lock}. The main reason for subclassing
            * is to allow fast path for nonfair version.
            */
        abstract void lock();
        ......
    }
    

    调用了synclock()方法,Sync类的lock()方法是一个抽象方法,NonfairSync()FairSync()分别对lock()方法进行了实现。

    //非公平锁的lock实现
    static final class NonfairSync extends Sync {
        ......
        /**
            * Performs lock.  Try immediate barge, backing up to normal
            * acquire on failure.
            */
        final void lock() {
            if (compareAndSetState(0, 1)) //插队操作,首先尝试CAS获取锁,0为锁空闲
                setExclusiveOwnerThread(Thread.currentThread()); //获取锁成功后设置当前线程为占有锁线程
            else
                acquire(1);
        }
        ......
    }
    
    //公平锁的lock实现
    static final class FairSync extends Sync {
        ......
        final void lock() {
            acquire(1);
        }
        ......
    }
    

    注意看他们的区别,NonfairSync()会先进行一个CAS操作,将一个state状态从0设置到1,这个也就是上面所说的非公平锁的“插队”操作,前面讲过CAS操作默认是原子性的,这样就保证了设置的线程安全性。这是非公平锁和公平锁的第一点区别。

    那么这个state状态是做什么用的呢?从0设置到1又代表了什么呢?再来看看跟state有关的源码:

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    /**
        * The synchronization state.
        */
    private volatile int state;
    
    protected final int getState() {
        return state;
    }
    
    protected final void setState(int newState) {
        state = newState;
    }
    

    首先state变量是一个volatile修饰的int类型变量,这样就保证了这个变量在多线程环境下的可见性。从变量的注释“The synchronization state”可以看出state代表了一个同步状态。再回到上面的lock()方法,在设置成功之后,调用了setExclusiveOwnerThread方法将当前线程设置给了一个私有的变量,这个变量代表了当前获取锁的线程,放到了AQS的父类AbstractOwnableSynchronizer类中实现。

    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
        ......
    
        /**
         * The current owner of exclusive mode synchronization.
         */
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    如果设置state成功,lock()方法执行完毕,代表获取了锁。可以看出state状态就是用来管理是否获取到锁的一个同步状态,0代表锁空闲,1代表获取到了锁。那么如果设置state状态不成功呢?接下来会调用acquire(1)方法,公平锁则直接调用acquire(1)方法,不会用CAS操作进行插队。acquire(1)方法是实现在AQS中的一个方法,看下他的源码:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    这个方法很重要也很简单理解,有几步操作,首先调用tryAcquire尝试获取锁,如果成功,则执行完毕,如果获取失败,则调用addWaiter方法添加当前线程到等待队列,同时添加后执行acquireQueued方法挂起线程。如果挂起等待中需要中断则执行selfInterrupt将线程中断。下面来具体看看这个流程执行的细节,首先看看tryAcquire方法:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    //NonfairSync
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { //锁空闲
            if (compareAndSetState(0, acquires)) { //再次cas操作获取锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { //当前线程重复获取锁,也就是锁重入
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    //FairSync
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && //判断队列中是否已经存在等待线程,如果存在则获取锁失败,需要排队
                compareAndSetState(0, acquires)) { //不存在等待线程,再次cas操作获取锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { //当前线程重复获取锁,也就是锁重入
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    //AQS中实现,判断队列中是否已经存在等待线程
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    AQS没有提供具体的实现,ReentrantLock中公平锁和非公平锁分别有自己的实现。非公平锁在锁空闲的状态下再次CAS操作尝试获取锁,保证线程安全。如果当前锁非空闲,也就是state状态不为0,则判断是否是重入锁,也就是同一个线程多次获取锁,是重入锁则将state状态+1,这也是ReentrantLock`支持锁重入的逻辑。

    公平锁和非公平锁在这上面有第二点区别,公平锁在锁空闲时首先会调用hasQueuedPredecessors方法判断锁等待队列中是否存在等待线程,如果存在,则不会去尝试获取锁,而是走接下来的排队流程。至此非公平锁和公平锁的区别大家应该清楚了。如果面试时问道公平锁和非公平锁的区别,相信大家可以很容易答出来了。

    通过tryAcquire获取锁失败之后,会调用acquireQueued(addWaiter),先来看看addWaiter方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);   //用EXCLUSIVE模式初始化一个Node节点,代表是一个独占锁节点
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) { //如果尾节点不为空,代表等待队列中已经有线程节点在等待
            node.prev = pred; //将当前节点的前置节点指向尾节点
            if (compareAndSetTail(pred, node)) { //cas设置尾节点为当前节点,将当前线程加入到队列末尾,避免多线程设置导致数据丢失
                pred.next = node;
                return node;
            }
        }
        enq(node); //如果队列中无等待线程,或者设置尾节点不成功,则循环设置尾节点
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) //空队列,初始化头尾节点都为一个空节点
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { //重复addWaiter中的设置尾节点,也是cas的经典操作--自旋,避免使用Synchronized关键字导致的线程挂起
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node(); //共享模式
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;  //独占模式
    
        ......
    }
    

    addWaiter方法首先初始化了一个EXCLUSIVE模式的Node节点。Node节点大家应该很熟悉,我写的集合系列文章里面介绍了很多链式结构都是通过这种方式来实现的。AQS中的Node也不例外,他的队列结构也是通过实现一个Node内部类来实现的,这里实现的是一个双向队列。Node节点分两种模式,一种SHARED共享锁模式,一种EXCLUSIVE独占锁模式,ReentrantLock使用的是EXCLUSIVE独占锁模式,所用用EXCLUSIVE来初始化。共享锁模式后面的文章我们再详细讲解。

    初始化Node节点之后就是将节点加入到队列之中,这里有一点要注意的是多线程环境下,如果CAS设置尾节点不成功,需要自旋进行CAS操作来设置尾节点,这样即保证了线程安全,又保证了设置成功,这是一种乐观的锁模式,当然你可以通过synchronized关键字锁住这个方法,但这样效率就会下降,是一种悲观锁模式。

    设置节点的过程我通过下面几张图来描述下,让大家有更形象的理解:

    将当前线程加入等待队列之后,需要调用acquireQueued挂起当前线程:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //获取当前节点的前置节点
                if (p == head && tryAcquire(arg)) { //如果前置节点是头节点,说明当前节点是第一个挂起的线程节点,再次cas尝试获取锁
                    setHead(node); //获取锁成功设置当前节点为头节点,当前节点占有锁
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否需要挂起线程
                    parkAndCheckInterrupt())  //挂起线程,当前线程阻塞在这里!
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    可以看到这个方法是一个自旋的过程,首先获取当前节点的前置节点,如果前置节点为头结点则再次尝试获取锁,失败则挂起阻塞,阻塞被取消后自旋这一过程。是否可以阻塞通过shouldParkAfterFailedAcquire方法来判断,阻塞通过parkAndCheckInterrupt方法来执行。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) //代表继任节点需要挂起
            /*
                * This node has already set status asking a release
                * to signal it, so it can safely park.
                */
            return true;
        if (ws > 0) { //代表前置节点已经退出(超时或中断等情况) 
            /*
                * Predecessor was cancelled. Skip over predecessors and
                * indicate retry.
                */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); //前置节点退出,循环设置到最近的一个未退出节点
            pred.next = node;
        } else { //非可挂起状态或退出状态则尝试设置为Node.SIGNAL状态
            /*
                * waitStatus must be 0 or PROPAGATE.  Indicate that we
                * need a signal, but don't park yet.  Caller will need to
                * retry to make sure it cannot acquire before parking.
                */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//挂起当前线程
        return Thread.interrupted();
    }
    

    只有当节点处于SIGNAL状态时才可以挂起线程,Node的waitStatus有4个状态分别是:

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
        * waitStatus value to indicate the next acquireShared should
        * unconditionally propagate
        */
    static final int PROPAGATE = -3;
    

    注释写的很清楚,这里就不详细解释着四种状态了。到这里整个Lock的过程我们就全部说完了,公平锁和非公平锁的区别从Lock的过程中我们也很容易发现,非公平锁一样要进行排队,只不过在排队之前会CAS尝试直接获取锁。说完了获取锁,下面来看下释放锁的过程。

    ReentrantLock的unLock()方法

    unLock()方法比较好理解,因为他不需要考虑多线程的问题,如果unLock()的不是之前lock的线程,直接退出就可以了。看看unLock()的源码:

    public class ReentrantLock implements Lock, java.io.Serializable {
        ......
        public void unlock() {
            sync.release(1);
        }
        ......
    }
    
    public abstract class AbstractQueuedSynchronizer {
        ......
        public final boolean release(int arg) {
            if (tryRelease(arg)) { //尝试释放锁
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h); //释放锁成功后启动后继线程
                return true;
            }
            return false;
        }
        ......
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ......
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread()) //释放锁必须要是获取锁的线程,否则退出,保证了这个方法只能单线程访问
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { //独占锁为0后代表锁释放,否则为重入锁,不释放
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        ......
    }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ......
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread); //挂起当前线程
        }
        ......
    }
    

    lock()方法一样,会调用AQS的release方法,首先调用tryRelease尝试释放,首先必须要是当前获取锁的线程,之后判断是否为重入锁,非重入锁则释放当前线程的锁。锁释放之后调用unparkSuccessor方法启动后继线程。

    总结

    ReentrantLock的获取锁和释放锁到这里就讲完了,总的来说还是比较清晰的一个流程,通过AQS的state状态来控制锁获取和释放状态,AQS内部用一个双向链表来维护挂起的线程。在AQS和ReentrantLock之间通过状态和行为来分离,AQS用管理各种状态,并内部通过链表管理线程队列,ReentrantLock则对外提供锁获取和释放的功能,具体实现则在AQS中。下面我通过两张流程图总结了公平锁和非公平锁的流程。

    非公平锁:

    公平锁:

  • 相关阅读:
    八十五:redis之redis的事物、发布和订阅操作 (2019-11-18 22:54)
    八十四:redis之redis的集合、哈希操作
    八十三:redis之redis的字符串、过期时间、列表操作
    八十三:redis之redis的使用场景和安装
    八十二:memcached之python操作memcached
    八十一:memcached之telnet操作memcached
    八十:memcached之安装与参数
    MySQL篇之Navicat可视化工具
    MySQL数据库篇之多表查询
    MySQL数据库篇之单表查询
  • 原文地址:https://www.cnblogs.com/konck/p/9466496.html
Copyright © 2011-2022 走看看