zoukankan      html  css  js  c++  java
  • 并发编程五、分析J.U.C中ReentrantLock的底层设计

      JUC(java.util.concurrent)是在并发编程中常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。并发包的作者是大名鼎鼎的Doug Lea。

     /*
     * ...
     * @since 1.5
     * @author Doug Lea
     */
    

    一、JUC中的Lock接口

      Lock在JUC中是最核心的组件,在JUC包中的绝大部分组件都使用到了Lock。在Lock接口出现之前,Java中的应用程序对于并发编程线程安全问题只能基于synchronized来解决。但synchronized会在某些场景存在一些短板,并不适合于所有的并发编程场景。在JDK5以后Lock的出现可以弥补这些场景,且使用起来相对灵活。
      Lock本质上是一个接口,定义了获得锁和释放锁的抽象方法,定义为接口意味着它定义了一个锁的规范,也意味着有锁的不同实现。下面来看一下常用的锁的实现:

    1. ReentrantLock

    重入锁,直接实现了Lock接口。重入指的是一个线程在获得锁后且没有释放锁的期间,该线程再次获取该锁不需要阻塞。

    2. ReentrantReadWriteLock

    重入读写锁,实现了ReadWriteLock接口,内部维护了两把锁:读锁readLock和写锁writeLock。读写锁适合在读多写少的场景下解决线程安全问题,能够提供比排它锁更好的并发性和吞吐量。
    基本原则:读和读不互斥、读和写互斥、写和写互斥。

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    public class ReadWriteLockDemo {
    
        static Map<String,Object> cacheMap = new HashMap<>();
        static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        static Lock read = readWriteLock.readLock();
        static Lock write = readWriteLock.writeLock();
    
        public static Object get(String key){
            read.lock(); //读锁 ThreadA 阻塞
            try{
                return cacheMap.get(key);
            }finally {
                read.unlock(); //释放读锁
            }
        }
    
        public static Object write(String key,Object value){
            write.lock(); //Other Thread 获得了写锁
            try{
                return cacheMap.put(key,value);
            }finally {
                write.unlock();
            }
        }
    
    }
    

    在这个案例中,通过hashmap 来模拟了一个内存缓存,然后使用读写所来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时候,读锁不会被阻塞,因为读操作不会影响执行结果。在执行写操作是,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,其他读写操作才能继续执行。 使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性

    3. StampedLock

    StampedLock是JDK8引入的新的锁机制,可以简单认为是读写锁的一个改进版本。读写锁虽然通过分离读写锁使得读和写可以完全并发,但读和写冲突,如果大量的线程存在,可能会引起线程的饥饿。StampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程。

    二、ReentrantLock重入锁

    1. ReentrantLock的类关系图

    主要几个方法介绍:
    void lock(); 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放;
    boolean tryLock(); 非阻塞获取锁;如果成功返回true,如果失败不会阻塞线程而是返回false;
    void unlock(); 释放锁
    void lockInterruptibly() throws InterruptedException; 和lock()方法相似,但阻塞的线程可以中断; 
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException; 带有超时时间的获取锁的方法;
    Condition newCondition(); 创建Condition,线程通信
    

    2. 重入的特性

      重入锁,表示支持重新进入的锁,也就是说,如果当前线程t1通过调用lock方法获取了锁之后,再次调用lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized和ReentrantLock都是可重入锁。
    至于设计重入锁的目的,可以先看下下面的例子:

    public class ReEnter {
    
        // step1获得对象锁
        public synchronized void step1() {
            System.out.println("This is step1");
            step2();
        }
    
        public void step2() {
            synchronized (this) {
                // step2内也获得对象锁
                System.out.println("This is step2");
            }
        }
    
        public static void main(String[] args) {
    
            ReEnter demo = new ReEnter();
            demo.step1();
    
        }
    
    }
    

    step1、step2两个方法都会获得对象锁,然后在主线程中调用step1,如果此时synchronized不支持同一个线程重入,那么在step1获得对象锁后,在step2内就不能获得锁,就会产生死锁。

    三、ReentrantLock源码分析及AQS

    1. AQS知识点分析

    JUC lock的本质
      是对一个共享变量标记的抢占,以及对没有抢占到的线程的排队和阻塞机制
    也就是对一个volatile共享变量state的抢占, 抢占不到的线程封装为Node节点存储于一个双向链表的同步队列即AQS同步队列中去,在线程释放锁后从同步队列中唤醒后续线程去抢占锁。

    AQS是什么
      在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。如果搞懂了 AQS,那么 J.U.C 中绝大部分的工具都能轻松掌握。

    AQS的两种功能
      从使用层面来说,AQS 的功能分为两种:独占和共享。
    独占锁:每次只能有一个线程持有锁,比如 ReentrantLock 就是以独占方式实现的互斥锁。
    共享锁:允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock。

    AQS的内部实现
      AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

    Node节点的5种状态
      分别是: CANCELLED (1)、 SIGNAL (-1) 、 CONDITION (-2) 、 PROPAGATE(-3) 、默认状态 (0)
    CANCELLED: 在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点 其结点的 waitStatus 为 CANCELLED ,即结束状态,进入该状态后的结点将不会再变化
    SIGNAL:只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
    CONDITION: 和 Condition 有关系,实现线程通信
    PROPAGATE: 共享模式下, PROPAGATE 状态的线程处于可运行状态

    当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化:
    添加线程对于队列的变化
      新的线程加入到队列里会涉及到两个变化:

    	private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    	
    	private Node enq(final Node node) {
            for (;;) { // 自旋入队
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    1. 新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己
    2. 通过 CAS 将 tail 重新指向新的尾部节点

    释放锁对于队列的变化
      head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下:

    	private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
    	    // 如果head的next节点不可用,waitStatus>0表示状态不可用,线程不能被唤醒
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    // 从尾节点tail往前查找可用节点
    		if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)// 唤醒后续的一个线程
                LockSupport.unpark(s.thread);
        }
    
    1. 修改 head 节点指向下一个获得锁的节点
    2. 新的获得锁的节点,将 prev 的指针指向 null

    设置 head 节点不需要用 CAS ,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能 由一个线程获得,所以不需要 CAS 保证,只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可

    原本挂起的线程继续执行
    首先,在抢夺锁失败后,在acquireQueued()方法内部逻辑中会调用LockSupport.park(this)将当前线程挂起:
    之后,在head节点释放锁后,调用unpark()来唤醒下一个节点时,会继续 parkAndCheckInterrupt()逻辑,此逻辑在acquireQueued()方法的for(;;)自旋内,可以获取锁并充值head节点

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { // 自旋
    		// 获取到当前节点的前一节点
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {// 如果当前节点的前一节点是头结点,有抢占锁的资格,且成功抢占了锁
    	            // 设置当前节点为头结点
    		    setHead(node);
    		    // 旧的头结点解除绑定关系
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    	
    	// 应不应该挂起抢夺锁失败的线程: pred是SIGNAL状态,则可以挂起当前入队的节点;如果不是SIGNAL状态,删除脏数据,继续外层for(;;)循环
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
              int ws = pred.waitStatus;
              if (ws == Node.SIGNAL)  // SIGNAL状态运行挂起来等待唤醒
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
              if (ws > 0) { // 状态>0 ,表示线程已被中断,无效需要删除掉这些节点
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;  // 相当于 pred = pred.prev; node.prev = pred; 循环排除中断的线程对应节点
                } while (pred.waitStatus > 0);
                pred.next = node;
              } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
              }
            return false;
        }
    	
        //挂起线程
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this); // 此处会挂起线程,并在调用unpark时会从这里唤醒,继续外层acquireQueued()方法内的for循环来获取锁
            return Thread.interrupted();
        }
    	
    

    Unsafe类
      Unsafe类是在 sun.misc 包下,不属于 Java 标准。但是很 多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty 、Hadoop 、 Kafka等。
    Unsafe可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、线程 的挂起和恢复、 CAS 、线程同步、内存屏障;
    而CAS 就是 Unsafe 类中提供的一个原子操作 第一个参数为需要改变的对象,第二个为偏移量 即之前求出来的 headOffset 的值,第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5 ,如果更新成功,则返回 true ,否则返回 false

    LockSupport
      LockSupport类是 Java6 引入的一个类,提供了基本的线程同步原语。 LockSupport实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数

    public native void unpark(Object var1);
    public native void park(boolean var1, long var2);
    

    unpark函数为线程提供 许可 ( permit)”,线程调用 park 函数则等待 许可。这个有点像信号量,但是这个许可是不能叠加一次性的。

    2. ReentrantLock时序图


    当锁没被占有,此时可以成功抢占锁,逻辑比较简单:
    因为是非公平锁,首先会将state标志从0置为1,这是调用的Unsafe类的CAS方法,可以保证原子性。
    状态重置成功表示当前线程可以抢占锁,将当前占有线程记录下来。


    当锁已被占有,抢占锁失败流程:
    会将当前线程封装为Node节点,并添加至双向链表尾部tail节点,此时原尾部节点的next指向当前node节点,当前node节点的prev指向原尾节点,放入队列排队;
    之后会将当前线程挂起:LockSupport.park(this);

    3. ReentrantLock、lock、unlock源码分析

      场景:t1、t2、t3三个线程使用ReentrantLock去抢占锁,

    
    import java.util.concurrent.locks.ReentrantLock;
    
    public class LockDemo {
    
        public static void main(String[] args) {
    
            ReentrantLock lock = new ReentrantLock();
    
            Thread t1 = new Thread(()-> {
    
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " get lock ");
                } finally {
    
                    System.out.println(Thread.currentThread().getName() + " release lock ");
                    lock.unlock();
                }
    
            }, "t1");
    
            Thread t2 = new Thread(()-> {
    
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " get lock ");
                } finally {
    
                    System.out.println(Thread.currentThread().getName() + " release lock ");
                    lock.unlock();
                }
    
            }, "t2");
    
            Thread t3 = new Thread(()-> {
    
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " get lock ");
                } finally {
    
                    System.out.println(Thread.currentThread().getName() + " release lock ");
                    lock.unlock();
                }
    
            }, "t3");
    
            t1.start();
            t2.start();
            t3.start();
    
        }
    
    }
    
    ... 运行结果
    t1 get lock 
    t1 release lock 
    t2 get lock 
    t2 release lock 
    t3 get lock 
    t3 release lock 
    

    会有五个步骤:

    A、t1抢占到锁成功
    B、t2抢占锁失败加入AQS同步队列
    C、t3抢占锁失败加入AQS同步队列
    D、t1释放锁,唤醒t2抢占到锁
    E、t2释放锁,唤醒t3抢占到锁
    F、t3释放锁,程序运行完毕
    

    (当然,这里不排除t1先启动,抢占并释放锁后,t2才启动,自然t2就不会加入AQS同步队列等其它场景,只是为了简单来分析上述5个步骤的源码流程)

    A、t1抢占到锁

    ReentrantLock
    
        public void lock() {
            sync.lock();  // A01. 此处sync为默认实现,sync = new NonfairSync(); 非公平锁
        }  
    
    ...
    
    NonfairSync 
       
            final void lock() {
                if (compareAndSetState(0, 1))  // A02. 因为state此时为0,抢占成功  
                    setExclusiveOwnerThread(Thread.currentThread());  // A04. 设置t1为持有锁的线程
                else
                    acquire(1);
            }
    
    ...
    AbstractQueuedSynchronizer#compareAndSetState
        private volatile int state;    
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);  // A03. CAS,unsafe类的原子操作,修改共享变量state,0 -> 1,返回true
        }
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;  // A05. 设置持有锁的线程
        }
    
    
    

    至此,t1线程将共享变量state由0改为1,将持有锁的线程exclusiveOwnerThread标记为t1当前线程,加锁成功。

    B、t2抢占锁失败加入AQS同步队列

    ReentrantLock
    
        public void lock() {
            sync.lock();  // B01. 默认实现,sync = new NonfairSync(); 非公平锁
        }  
    
    ...
    
    NonfairSync 
       
            final void lock() {
                if (compareAndSetState(0, 1))  // B02. 因为state此时为1,CAS失败返回false 
                    setExclusiveOwnerThread(Thread.currentThread());  
                else
                    acquire(1); // B03. 去抢夺锁 
            }
    ... 
    
    java.util.concurrent.locks.AbstractQueuedSynchronizer
        // B03. 去抢夺锁 
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&  // B04. 尝试抢占锁, B04至B08: 尝试失败返回false,此处 !tryAcquire 返回true 
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 两步:  B09. 至B16. addWaiter 添加排它锁类型的节点至同步队列 ;  B17.  acquireQueued 请求锁or挂起线程
                selfInterrupt();
        }
    ...
    java.util.concurrent.locks.ReentrantLock.NonfairSync#
            // B04. 尝试抢占锁
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires); // B05. 非公平锁的实现尝试抢占锁
            }
    ...
    java.util.concurrent.locks.ReentrantLock.Sync#
            // B05. 非公平锁的实现尝试抢占锁
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();   // 取得当前线程, 为t2
                int c = getState();  // 取得共享变量state,因为t1线程已经降至改为了1,所以返回 1
                if (c == 0) {    // B06. 1 != 0,返回false
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {  // B07. getExclusiveOwnerThread为t1, t2 != t1,返回fasle
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;  // B08. 返回false,尝试抢占锁失败
            }
    ...
    java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
        // B09. addWaiter 添加排它锁类型的节点至同步队列
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);  // B10. 将当前线程封装为Node节点
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;  // B11. 取得tail节点,此时队列中无数据,head、tail均为Null
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);  // B12. 入队,将当前节点添加至队尾去排队
            return node;  // B16.  t2入队已成功,返回t2 
        }
    ...
    java.util.concurrent.locks.AbstractQueuedSynchronizer.Node#Node(java.lang.Thread, java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)
            // B10. 将当前线程封装为Node节点
            Node(Thread thread, Node mode) {    
                this.nextWaiter = mode;
                this.thread = thread;
            }
    ...
    java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
        // B12. 入队,将当前节点添加至队尾去排队
        private Node enq(final Node node) {
            for (;;) {   // B13 第一次自旋
                Node t = tail;  
                if (t == null) { // Must initialize  
                    if (compareAndSetHead(new Node())) // 此时tail为null,赋初始值
                        tail = head;  // head、tail均为 new Node()节点
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
        private Node enq(final Node node) {
            for (;;) {   // B14 第二次自旋
                Node t = tail;  // tail为 new Node()节点
                if (t == null) { // Must initialize  
                    if (compareAndSetHead(new Node())) 
                        tail = head;  
                } else {
                    node.prev = t;  // 添加至队尾操作,指定t2线程节点的 prev 节点为 原tail 
                    if (compareAndSetTail(t, node)) { // CAS t2节点为新的tail
                        t.next = node;  // 和原尾节点绑定关系,成功入队
                        return t; // B15 返回原tail节点、此时为new Node(),自旋结束
                    }
                }
            }
        }
    ...
    java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
        //  B17.  acquireQueued 请求锁or挂起线程
        final boolean acquireQueued(final Node node, int arg) {    
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {  // B18. 第一次自旋
                    final Node p = node.predecessor(); // B19. 取得t2的前一节点, enq入队时已绑定关系, 即为head
                    if (p == head && tryAcquire(arg)) { // 两步: 一、 前一节点为head,p == head 返回true; 二、tryAcquire试图获取锁,此时t1线程扔持有锁,尝试获取失败返回false;此处逻辑为 如果当前节点的prev是头节点,会再尝试获取一次锁; 因为返回false继续后续逻辑
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&  // B20 在尝试抢夺锁失败后(AfterFailedAcquire)是否应该挂起(shouldPark) 【方法名起的是真滴好】; 此时p为prev,即为new Node(),通过 B21的返回,此方法返回false不应该挂起线程,则进入下次自旋
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
           acquireQueued(final Node node, int arg) {
                ...
                for (;;) {  // B22. 第二次自旋
                    final Node p = node.predecessor();  // 取得前一节点 
                    if (p == head && tryAcquire(arg)) {  // t2尝试获取锁失败
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&  // B23 应不应该挂起锁,至 B25 返回true应该挂起线程;
                        parkAndCheckInterrupt())      // B26      
                        interrupted = true;
                }
                ... 
           }
    
    java.util.concurrent.locks.AbstractQueuedSynchronizer.Node#predecessor
            // B19. 取得t2的前一节点, enq入队时已绑定关系, 即为head
            final Node predecessor() throws NullPointerException { 
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
        // B20 在尝试抢夺锁失败后(AfterFailedAcquire)是否应该挂起(shouldPark) 【方法名起的是真滴好】 
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;  // prev此时为 new Head(), waitStatus为int类型,没赋值默认为0
            if (ws == Node.SIGNAL)  // SIGNAL为-1, 0 != -1, 返回false
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {  // 0 !> 0 返回false
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {   // 进入else 
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  // 将pred由 0 转为 SIGNAL, 0 -> -1 ; 【tip: 只要锁释放,就会通知标识为 SIGNAL 状态的后续节点的线程】
            }
            return false;  // B21 返回false
        }
    
        // B24 第二次咨询,判断抢夺锁失败后应不应该挂起
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;  // 此时经过第一次自旋, prev.waitStatus 已经是 SIGNAL
            if (ws == Node.SIGNAL)  // 相等
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;  // B25 返回true,应该挂起线程
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
        // B26. 挂起线程 并检查 期间线程是否中断过
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);  // B27. park,挂起t2线程 【tip: 当unpark唤醒线程后,继续从这里执行后续逻辑 -> 】 
            return Thread.interrupted();
        }
    

    C、t3抢占锁失败加入AQS同步队列

      和 B、t2抢占锁并加入AQS队列相似,只是t3入队时是在 t2 节点的后面 【t2Node.next = t3Node, t3Node.prev = t2Node, tail = t3Node】
    

    D、t1释放锁,唤醒t2抢占到锁

    java.util.concurrent.locks.ReentrantLock#unlock
    
        public void unlock() {   // D1. t1线程运行完毕,unlock释放锁
            sync.release(1);  // D2. 释放 1 次重入锁
        }
    
    java.util.concurrent.locks.AbstractQueuedSynchronizer#release
         // D2. 释放锁 
        public final boolean release(int arg) { 
            if (tryRelease(arg)) {  // D3. 尝试释放锁  至D4,返回true,释放锁成功
                Node h = head;  // 拿到头结点
                if (h != null && h.waitStatus != 0) // head不为null,h.waitStatue为SIGNAL
                    unparkSuccessor(h); // D5. 唤醒头结点的下一节点 
                return true;
            }
            return false;
        }
    
    java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
            // D3. 尝试释放锁
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;  // t1线程进入了一次,getState此时为1;  1 - 1 -》 c=0;
                if (Thread.currentThread() != getExclusiveOwnerThread())  // 当前线程和占用锁的线程比较, 理论上必须相等, 此时都为t1
                    throw new IllegalMonitorStateException();
                boolean free = false;  
                if (c == 0) {  // c == 0; 为true
                    free = true;  // free为true,释放锁成功
                    setExclusiveOwnerThread(null);  // D3-01. 清除占有锁的线程标志,为Null
                }
                setState(c);  // D3-02. state共享变量重置为0 
                return free;  // D4. free为true,释放锁成功
            }
    
    java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
        // D5. 唤醒头结点的下一节点 
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus; // 此时node为head节点
            if (ws < 0)  // head.waitStatus是SIGNAL,为-1
                compareAndSetWaitStatus(node, ws, 0);  // 设置为0 
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next; // head的next节点,即为t2线程节点 
            if (s == null || s.waitStatus > 0) { // t2节点的waitStatus 不会>0, 【waitStatus > 0的节点为CANCELLED状态的节点,是线程被中断的节点】
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)  (这个逻辑t2线程不会进入,目的是如果head的next节点被中断过即waitStatus>0,则不需要被唤醒,因为线程被中断了; 此时会从tail节点向前查找 队列中第一个/循环中最后一个 没有被中断过的线程,即为需要被唤醒的线程)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);  // D6. unpark 唤醒t2线程啦
        }
    
    

    LockSupport.unpark(s.thread); 唤醒t2线程后,t2线程继续从之前park的逻辑处执行:

    java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
        // 上面D6逻辑unpark执行后,t2线程从 B27 park的逻辑处唤醒,继续执行 go go go 
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);  // D7. t2线程被唤醒 
            return Thread.interrupted();  // D8. 判断线程在休眠期间是否被中断过,返回中断标识;  t2线程没有被中断, 返回false 
        }
    
    java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
        // D7 unpark后继续后续逻辑,仍处于自旋中
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && 
                        parkAndCheckInterrupt())  // D9. 此时已被唤醒,并从D8处返回中断标识 false ,则继续下一次自旋
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
       acquireQueued {
                ...
                for (;;) { // 下一次自旋开始 
                    final Node p = node.predecessor(); // D10. 队列中t2的上一节点, 即为head 
                    if (p == head && tryAcquire(arg)) { // p == head为true, D11. tryAcquire(arg)试图抢占锁
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
                ... 
          }
    
    java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
            // D11. tryAcquire(arg)试图抢占锁
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);  // D12. 非公平锁试图抢占锁
            }
                
    java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
            // D12. 非公平锁试图抢占锁
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();  // 当前线程 t2
                int c = getState();  // 获取state, t1线程在释放锁后,已经改为了0,见 C3-02步骤
                if (c == 0) {  //  为true
                    if (compareAndSetState(0, acquires)) {  // CAS对共享变量state修改, 0 -> 1
                        setExclusiveOwnerThread(current);  // 标记当前持有锁的线程为 t2 
                        return true;   // D13. 返回true,标识t2线程成功抢占了锁,可以执行上述demo的逻辑
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    E、t2释放锁,唤醒t3抢占到锁

      和t1释放锁、唤醒t2线程相似
      首先t2释放锁,将锁的标识共享变量 state 从1改为0;  将 持有锁的线程标识 exclusiveOwnerThread 置为 null;
      之后从队列中找出下一节点即为 t3, unpark唤醒;
      t3被唤醒后 先去抢占锁, state 从 0改为1, exclusiveOwnerThread  改为 t3,抢占锁成功
    

    F、t3释放锁,程序运行完毕

      t3释放锁, state 从1改为0;  将 持有锁的线程标识 exclusiveOwnerThread 置为 null; 
      结束
    
  • 相关阅读:
    poj 1321 棋盘问题 (DFS深度优先搜索)
    HDOJ1312 Red and black(DFS深度优先搜索)
    《数据结构》例1.3
    hdoj1905 Pseudoprime numbers (基础数论)
    百练4003 十六进制转十进制(水题)
    第二天——2.23
    第一天——2.22
    返校——2.21
    被盗号了——2.19
    继续咸鱼——2.18
  • 原文地址:https://www.cnblogs.com/Qkxh320/p/thread_05.html
Copyright © 2011-2022 走看看