zoukankan      html  css  js  c++  java
  • 关于 ReentrantLock 中锁 lock() 和解锁 unlock() 的底层原理浅析

    关于 ReentrantLock 中锁 lock() 和解锁 unlock() 的底层原理浅析

    如下代码,当我们在使用 ReentrantLock 进行加锁和解锁时,底层到底是如何帮助我们进行控制的啦?

        static Lock lock = new ReentrantLock();
    
        public static void main(String[] args) {
    
            // 使用两个线程模拟多线程执行并发
            new Thread(() -> doBusiness(), "Thread-1").start();
            new Thread(() -> doBusiness(), "Thread-2").start();
        }
    
        private static void doBusiness() {
            try {
                lock.lock();
                System.out.println("需要加锁的业务处理代码,防止并发异常");
            } finally {
                lock.unlock();
            }
        }

    带着这样的疑问,我们先后跟进 lock()和unlock() 源码一探究竟

    说明:

      1、在进行查看 ReentrantLock 进行 lock() 加锁和 unlock() 解锁源码时,需要知道 LockSupport 类、了解自旋锁以及链表相关知识。

      2、在分析过程中,假设第一个线程获取到锁的时候执行代码需要很长时间才释放锁,及在第二个第三个线程来获取锁的时候,第一个线程并没有执行完成,没有释放锁资源。

      3、在分析过程中,我们假设第一个线程就是最先进来获取锁的线程,那么第二个第三个线程也是依次进入的,不会存在第三个线程先于第二个线程(即第三个线程如果先于第二个线程发生,那么第三个线程就是我们下面描述的第二个线程)

    一、 lock() 方法

    1、查看lock()方法源码

        public void lock() {
            sync.lock();
        }

      从上面可以看出 ReentrantLock 的 lock() 方法调用的是 sync 这个对象的 lock() 方法,而 Sync 就是一个实现了抽象类AQS(AbstractQueuedSynchronizer) 抽象队列同步器的一个子类,继续跟进代码(说明:ReentrantLock 分为公平锁和非公平锁,如果无参构造器创建锁默认是非公平锁,我们按照非公平锁的代码来讲解)

      1.1 关于Sync子类的源码

    abstract static class Sync extends AbstractQueuedSynchronizer { 
        // 此处省略具体实现AbstractQueuedSynchronizer 类的多个方法
    }

      这里需要说明的是 AbstractQueuedSynchronizer 抽象队列同步器底层是一个通过Node实现的双向链表,该抽象同步器有三个属性 head 头节点tail 尾节点 和 state 状态值。

        属性1:head——注释英文翻译:等待队列(同步阻塞队列)的头部,懒加载,用于初始化,当调用 setHead() 方法的时候会对 head 进行修改。注:如果 head 节点存在,则 head 节点的 waitStatus 状态值用于保证其不变成 CANCELLED(取消状态,值为1) 状态

        /**
         * Head of the wait queue, lazily initialized.  Except for
         * initialization, it is modified only via method setHead.  Note:
         * If head exists, its waitStatus is guaranteed not to be
         * CANCELLED.
         */
        private transient volatile Node head;

        属性2: tail——tail节点是等待队列(同步阻塞队列)的尾部,懒加载,在调用 enq() 方法时会添加一个新的 node 到等待队列的时候会修改 tail 节点。

        /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
        private transient volatile Node tail;

        属性3:state——用于同步的状态码。如果 state 该值为0,则表示没有其他线程获取到锁,如果该值大于0则表示已经被某线程获取到了锁,该值可以是2、3、4,所以使用该值可以处理重入锁(递归锁)的逻辑。

        /**
         * The synchronization state.
         */
        private volatile int state;

      1.2 上面 Sync 类使用 Node来作为双向队列的具体保存值和状态的载体,Node 的具体结构如下

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node(); // 共享锁模式(主要用于读写锁中的读锁)
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null; // 排他锁模式(也叫互斥锁)
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1; // Node线程等待取消,不再参与锁竞争,处于这种状态的Node会被踢出队列,被GC回收
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1; // 表明Node线程需要被唤醒,可以竞争锁
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2; // 表示这个Node线程在条件队列中,因为等待某个条件而被阻塞 
            /** waitStatus value to indicate the next acquireShared should unconditionally propagate */
            static final int PROPAGATE = -3; // 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播
    
            volatile int waitStatus; // 默认初始状态为0,所有新增node节点的初始状态都是0
    
            volatile Node prev; // 前驱Node节点
    
            volatile Node next; // 后继Node节点
    
            /** The thread that enqueued this node.  Initialized on construction and nulled out after use.*/
            volatile Thread thread; // 当前线程和节点进行绑定,通过构造器初始化Thread,在释放的时候将当前线程替换原有的null值
    
            // 省略部分代码

      说明:Sync 通过Node节点构建队列,Node节点使用prev和next节点来行程双向队列,使用prev来关联上一个节点,使用next来关联下一个节点,每一个node节点和一个thread线程进行绑定,用来表示当前线程在阻塞队列中的具体位置和状态 waitStatus

    2、上面的 sync.lock() 继续跟进源码(非公平锁):

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

    说明:上面代码说明,如果 compareAndSetState(0, 1) 为 true ,则执行 setExclusiveOwnerThread(Thread.currentThread()) ,否则执行 acquire(1);

      2.1 compareAndSetState(0, 1) 底层使用unsafe类完成CAS操作,意思就是判断当前state状态是否为0,如果为零则将该值修改为1,并返回true;state不为0,则无法将该值修改为1,返回false。

        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

      2.2 假如第1个线程进来的时候 compareAndSetState(0, 1) 肯定执行成功,state 状态会从0变成1,同时返回true,执行 setExclusiveOwnerThread(Thread.currentThread()) 方法:

        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }

      setExclusiveOwnerThread(Thread.currentThread()) 表示将当前 Sync 对象和当前线程绑定,意思是表明:当前队列内同步器执行的线程为 thread,该 thread 获取了锁正在执行。

      2.3 假如进来的线程为第2个,并且第一个线程还在执行没有释放锁,那么第2个线程就会执行 acquire(1)方法:

        public final void acquire(int arg) {
         // 如下:三个方法的各自作用
         // tryAcquire(arg) 当前线程是否还有机会获取到锁,非公平锁的特质
         // addWaiter() 将当前线程加入队列
         // acquireQueued() 方法中:1 判断是否是队首任务head节点 并且抢占锁成功,则修改链表 2、否则(不是队首head节点 或者 没有抢占到锁的所有节点)都进行阻塞(原理LockSupport.park())
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

      进入到该方法中发现,需要通过 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 两个方法判断是否需要执行 selfInterrupt();

        (1)先执行tryAcquire(arg)这个方法进行判断,注:tryAcquire(arg)方法体现了非公平锁的特性,只要在加入队列前都有机会获取锁。这个方法就是判断当前线程是否在进入队列前有机会获取锁。

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            /**
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
           // 获取state状态,因为第一个线程进来的时候只要还没有执行完就已经将state设置为1了(即:2.1步)
    int c = getState();
           // 再次判断之前获取锁的线程是否已经释放锁了
    if (c == 0) {
    // 如果之前的线程已经释放锁,那么当前线程进来就将状态改为1,并且设置当前占用锁的线程为自身
    if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
           // 判断当前占用锁的线程是不是就是我自身,如果是我自身,这将State在原值的基础上进行加1,来处理重入锁逻辑
    else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires;
              // 判断重入锁次数是不是超过限制,超过限制则直接报错
    if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

        从上面的方法看,如果第二个线程进来,且第一个线程还未释放锁的情况下,该方法 tryAcquire(arg) 直接放回false,那么 !tryAcquire(arg) 就为true,需要判断第二个方法 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),第二个方法先执行addWaiter(Node.EXCLUSIVE),及添加等待线程进入队列

        (2)添加等待线程到同步阻塞队列中

        private Node addWaiter(Node mode) {
         // 将当前线程和node节点进行绑定,设置模式为排他锁模式 Node node
    = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;// 第二个线程也就是第一次进来该方法的线程,tail肯定是null if (pred != null) { // 如果tail尾节点不为空,表示第3、4、5次进来的线程 node.prev = pred; // 那么就将当前进来的线程节点的 prev 节点指向之前的尾节点 if (compareAndSetTail(pred, node)) { // 通过比较并交换,如果当前尾节点在设置过程中没有被其他线程抢先操作,那么就将当前节点设置为tail尾节点 pred.next = node; // 将以前尾节点的下一个节点指向当前节点(新的尾节点) return node; } } enq(node); // 如果为第二个线程进来,就是上面的 pred != null 成立,if 没有执行,直接执行 enq()方法 return node; }
        private Node enq(final Node node) {
            for (;;) { // 一直循环检查,相当于自旋锁
                Node t = tail;
                if (t == null) { // Must initialize
                       // 第二个线程的第一次进来肯定先循环进入该方法,这时设置头结点,该头结点一般被称为哨兵节点,并且头和尾都指向该节点
    if (compareAndSetHead(new Node()))
                // 第一次进来设置的头结点和尾节点是同一个节点 tail
    = head; } else {
              // 1、第二个线程在第二次循环时将进入else 方法中,将该节点挂在哨兵节点(头结点)后,并且尾节点指向该节点,并且将该节点返回(该节点有prev信息) node.prev
    = t;
              // 在if中将当前队列的尾节点修改新增进来的 node 节点
    if (compareAndSetTail(t, node)) {
                // t节点是之前链表的尾部节点,第一次进来head和tail是同一个节点,并将新的尾节点node 设置到之前尾节点t 后,连成一个整串 t.next
    = node; return t; } } } }

        如上在执行 enq(final Node node) 结束,并且返回添加了第二个线程node节点的时候, addWaiter(Node mode) 方法会继续向上返回

        或者

        如果是添加第3、4个线程直接走 addWaiter(Node mode) 方法中的 if 流程直接添加返回

        都将到2.3 步,执行acquireQueued(final Node node, int arg),再次贴源码

        public final void acquire(int arg) {
            if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

        (3)即下一步就会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法:

        注:上面的流程是将后面的线程加入到了同步阻塞队列中,下面的方法第一个if (p == head && tryAcquire(arg))则是看同步阻塞队列的第一条阻塞线程是否可以获取到锁,如果能够获取到锁就修改相应链表结构,第二个if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()即将发生线程阻塞

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
              // 自旋锁,如果为第二个线程,那么 p 就是 head 哨兵节点
    final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {
                // 上面的 if 表明如果当前线程为同步阻塞队列中的第一个线程,那么就再次试图获取锁 tryAcquire(),如果获取成功,则修改同步阻塞队列 setHead(node); // 将head头结点(哨兵节点)设置为已经获取锁的线程node,并将该node的Theread 设置为空 p.next
    = null; // help GC 取消和之前哨兵节点的关联,便于垃圾回收器对之前数据的回收 failed = false; return interrupted; }
              // 如果第二个线程没有获取到锁(同步阻塞队列中的第一个线程),那么就需要执行下面两个方法,注标的方法会让当前未获取到锁的线程阻塞
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
        private void setHead(Node node) {
            // 将哨兵节点往后移,并且将 thread 设置为空,取消和以前哨兵节点的关联,并于垃圾回收器回收
            head = node;
            node.thread = null;
            node.prev = null;
        }

        shouldParkAfterFailedAcquire(p, node)这个方法将哨兵队列的状态设置为待唤醒状态

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
         // 该方法第一次进来 pred为哨兵节点,ws为哨兵节点的初始0状态
         // 该方法第二次进来 pred为哨兵节点,ws为哨兵节点的状态-1状态
         // 该方法第三、四次进来就为链表的倒数第二个节点,ws为倒数第二个节点的状态
    int ws = pred.waitStatus;
        
    if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */
           // 如上第二次进来的时候哨兵节点的状态就是-1,此时返回true
    return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {
              // 如果ws及倒数第二个节点是取消状态,那么通过双向链表向前找倒数第三个,第四关节点,直到向前找到最近一个状态不是取消的node节点,并把当前节点挂在该节点后 node.prev
    = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 将头结点(哨兵节点)设置成待唤醒状态,第一次进来的时候 有0——>-1 然后继续执行返回false } return false; }
        parkAndCheckInterrupt()这个方法会让当前线程阻塞
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this); // LockSupport.park()会导致当前线程阻塞,直到某个线程调用unpark()方法
            return Thread.interrupted();
        }

        那么在lock()方法执行时,只要第一个线程没有unlock()释放锁,其他所有线程都会加入同步阻塞队列中,该队列中记录了阻塞线程的顺序,在加入同步阻塞队列前有多次机会可以抢先执行(非公平锁),如果没有被执行到,那么加入同步阻塞队列后,就只有头部节点(哨兵节点)后的阻塞线程有机会获取到锁进行逻辑处理。再次查看该方法: 

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        // if 表明只有头部节点(哨兵节点)后的节点在放入同步阻塞队列前可以获取锁
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 所有线程都被阻塞在这个方法处
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }        

    二、unlock()方法

    1、unlock源码

        public void unlock() {
            sync.release(1);
        }

      同样是调用的同步阻塞队列的方法 sync.release(1),跟进查看源码:

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    2、查看tryRelease()方法:

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    // 如果不是自身锁对象调用unlock()方法的话,就报异常
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    // 如果标志位已经为0,表示重入锁已经全部释放,这将当前获取锁的线程设置为null,以便其他线程进行加锁
                    setExclusiveOwnerThread(null);
                }
                // 更新重入锁解锁到达的次数,如果C不为0,表示还有重入锁unlock()没有调用完
                setState(c);
                return free;
            }        

    3、如果tryRelease()方法成功执行,表示之前获取锁的线程已经执行完所有需要同步的代码(重入锁也完全退出),那么就需要唤醒同步阻塞队列中的第一个等待的线程(也是等待最久的线程),执行unparkSuccessor(h)方法:

        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            // 先获取头结点(哨兵节点)的waitStatus状态,如果小于0,则可以获取锁,并将waitStatus的状态设置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                // 如果哨兵节点的下一个节点为null,或者状态为1表示已经取消,则依次循环寻找(从后往前寻找)后面节点,直至找到一个waitStatus<0的节点,并将该节点设置为需要获取锁的节点
                s = null;
           // 注下面的 for 循环是从后往前遍历,直到 for 循环遍历完成找到最开头的一个节点,并且该节点 waitStatus<0。将找到的最开头的满足条件的节点给到 s 并对其进行唤醒
    for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 将该node节点的线程解锁,允许它去获取锁,然后执行业务逻辑 LockSupport.unpark(s.thread); }

    三、unlock()方法调用后,会到lock()方法阻塞的地方,完成唤醒工作

    1、在上面方法 unparkSuccessor(Node node) 中执行完 LockSupport.unpark(s.thread) 后在同步阻塞队列后的第一个 node 关联的线程将被唤醒,即unlock()方法代码执行完,将会到lock() 源码解析的 2.3 步里,第三次贴该处源码:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    2、在上面放大的红色方法中,之前上面lock()源码讲了当中所有线程都被阻塞了,如下面源码红色标记的地方:

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }

    3、所有未获取到锁的线程都在 parkAndCheckInterrupt() 方法处阻塞着,所以我们即将唤醒的哨兵节点后的第一个阻塞线程也是在该处阻塞着,在执行完 unlock() 源码步骤第3步unparkSuccessor(Node node)中的方法,则将返回到之前阻塞线程的这个方法 parkAndCheckInterrupt()的这行代码 LockSupport.park(this) 的下一步执行 Thread.interrupted(),因为线程没有被打断,所以返回false,故acquireQueued(final Node node, int arg)方法中继续轮训再次尝试acquireQueued(final Node node, int arg)获取锁,因为第一个线程已经释放锁,所以第二个线程可以获取锁了,并在执行完后返回interrupted为false,表示线程不是被中断的,继续向上返回,最终lock()方法执行完成(获取到锁),可以继续执行业务代码。理,其他线程也在parkAndCheckInterrupt()这个方法中中断着,等待被第二个线程唤醒。

    总结:

      在第一个 A 线程 lock() 获取到锁后,第一个线程会在底层的同步阻塞队列中设置锁状态 state 为1(如果重入锁多次获取 state 每次加1),并设置拥有当前锁的线程为自身A线程,其他线程 B/C/D 来获取锁的时候就会比较锁状态是否为0,如果不为0,表示已经被获取了锁,再次比较获取锁的线程是否为自身,如果为自身则对 state 加1(满足重入锁的规则),否则就将当前未获取到锁的线程放入同步阻塞队列中,在放入的过程中,需要设置 head 哨兵节点和 tail 尾节点,以及相应的 waitStatus 状态,并且在放入过程中需要设置当前节点以及先关节点的 prev 和 next 节点,从而达到双向队列的效果,存放到阻塞队列后,线程会被阻塞到这样一个方法中 parkAndCheckInterrupt(),对于用户(调用者)来说是执行到.lock()方法被阻塞,等待被唤醒。

      在第一个 A 线程执行完毕,调用 unlock() 解锁后,unlock() 方法会从同步阻塞队列的哨兵节点后的第一个节点获取等待解锁的线程B,并将其解锁,然后就会到B阻塞的方法 parkAndCheckInterrupt() 来继续执行,因为不是被中断,所以Thread.interrupted()方法为false,返回false后,在acquireQueued(final Node node, int arg)方法的for(;;)处自旋调用tryAcquire(arg)尝试获取锁,因为第一个线程已经释放锁,所以第二个线程在自旋获取锁的时候能够成功,代码继续执行,使得 B 线程原本在 lock() 方法处阻塞的代码能够执行完成继续向下执行(能够执行表明获取到锁),然后执行业务逻辑。其他线程以此类推,依次发生阻塞和唤醒。

    根据源码总结的 lock() 和 unlock() 的原理,欢迎大神批评指正,如有刚好的见解,也欢迎大家相互交流。

  • 相关阅读:
    sql中添加唯一索引(非主键)
    Ubuntu 安装 LAMP 主机之后运行出现乱码
    编写安全 PHP 应用程序的七个习惯
    首先在服务器上安装ssh的服务器端
    php安全简析
    正确的Linux菱形乱码修改方法
    php安全
    变量作用域
    bind9 详细解析
    DNS和DHCP服务器
  • 原文地址:https://www.cnblogs.com/yanzige/p/14175277.html
Copyright © 2011-2022 走看看