zoukankan      html  css  js  c++  java
  • 算法:CLH锁的原理及实现&并发锁核心类AQS

    一、背景
    1.1 SMP(Symmetric Multi-Processor)
    对称多处理器结构,它是相对非对称多处理技术而言的、应用十分广泛的并行技术。在这种架构中,一台计算机由多个CPU组成,并共享内存和其他资源,所有的CPU都可以平等地访问内存、I/O和外部中断。虽然同时使用多个CPU,但是从管理的角度来看,它们的表现就像一台单机一样。操作系统将任务队列对称地分布于多个CPU之上,从而极大地提高了整个系统的数据处理能力。但是随着CPU数量的增加,每个CPU都要访问相同的内存资源,共享资源可能会成为系统瓶颈,导致CPU资源浪费。
    SMP:对称多处理器结构
    1.2 NUMA(Non-Uniform Memory Access)
    非一致存储访问,将CPU分为CPU模块,每个CPU模块由多个CPU组成,并且具有独立的本地内存、I/O槽口等,模块之间可以通过互联模块相互访问,访问本地内存(本CPU模块的内存)的速度将远远高于访问远地内存(其他CPU模块的内存)的速度,这也是非一致存储访问的由来。NUMA较好地解决SMP的扩展问题,当CPU数量增加时,因为访问远地内存的延时远远超过本地内存,系统性能无法线性增加。
    NUMA:非一致存储访问

    1.3 CLH、MCS命名来源

    • MCS:John Mellor-Crummey and Michael Scott
    • CLH:Craig,Landin andHagersten

    二、CLH锁
    CLH是一种基于单向链表的高性能、公平的自旋锁。申请加锁的线程通过前驱节点的变量进行自旋。在前置节点解锁后,当前节点会结束自旋,并进行加锁。在SMP架构下,CLH更具有优势。在NUMA架构下,如果当前节点与前驱节点不在同一CPU模块下,跨CPU模块会带来额外的系统开销,而MCS锁更适用于NUMA架构。
    在这里插入图片描述

    锁值:我把自旋条件定义为锁值 locked。locked == true 表示节点的处于加锁状态或者等待加锁状态,locked == false 表示节点处于解锁状态。

    1. 基于线程当前节点的前置节点的锁值(locked)进行自旋,locked == true 自旋,locked == false 加锁成功。
    2. locked == true 表示节点处于加锁状态或者等待加锁状态。
    3. locked == false 表示节点处于解锁状态。
    4. 每个节点在解锁时更新自己的锁值(locked),在这一时刻,该节点的后置节点会结束自旋,并进行加锁。

    2.1 加锁逻辑

    1. 获取当前线程的锁节点,如果为空则进行初始化。
    2. 通过同步方法获取尾节点,并将当前节点置为尾节点,此时获取到的尾节点为当前节点的前驱节点。
    3. 如果尾节点为空,则表示当前节点为第一个节点,加锁成功。
    4. 如果尾节点不为空,则基于前驱节点的锁值(locked==true)进行自旋,直到前驱节点的锁值 locked == false。

    2.2 解锁逻辑

    1. 获取当前线程的锁节点,如果节点为空或者锁值(locked== false)则无需解锁,直接返回。
    2. 使用同步方法为尾节点赋空值,赋值不成功则表示当前节点不是尾节点,需要将当前节点的 locked == false 已保证解锁该节点。如果当前节点为尾节点,则无需设置该节点的锁值。因为该节点没有后置节点,即使设置了,也没有实际意义。

    2.3 Java代码

    package org.learn.lock;
    
    import java.util.concurrent.atomic.AtomicReference;
    
    /**
     * MCS:John Mellor-Crummey and Michael Scott
     * CLH:Craig,Landin and Hagersten
     * @author zhibo
     * @version 1.0
     * @date 2018/11/7 10:39
     */
    public class CLHLock implements Lock {
        private AtomicReference<CLHNode> tail;
        private ThreadLocal<CLHNode> threadLocal;
    
        public CLHLock() {
            this.tail = new AtomicReference<>();
            this.threadLocal = new ThreadLocal<>();
        }
    
        @Override
        public void lock() {
            CLHNode curNode = threadLocal.get();
            if(curNode == null){
                curNode = new CLHNode();
                threadLocal.set(curNode);
            }
    
            CLHNode predNode = tail.getAndSet(curNode);
            if(predNode != null){
                while (predNode.getLocked()){
    
                }
            }
        }
    
        @Override
        public void unlock() {
            CLHNode curNode = threadLocal.get();
            threadLocal.remove();
    
            if(curNode == null || curNode.getLocked() == false){
                return;
            }
    
            if(!tail.compareAndSet(curNode, null)){
                curNode.setLocked(false);
            }
        }
    
        public static void main(String[] args) {
            final Lock clhLock = new CLHLock();
    
            for (int i = 0; i < 10; i++) {
                new Thread(new DemoTask(clhLock, i + "")).start();
            }
        }
    
        class CLHNode {
            private volatile boolean locked = true;
    
            public boolean getLocked() {
                return locked;
            }
    
            public void setLocked(boolean locked) {
                this.locked = locked;
            }
        }
    }
    
    package org.learn.lock;
    
    /**
     * @author zhibo
     * @version 1.0
     * @date 2018/11/7 14:22
     */
    public class DemoTask implements Runnable {
        private Lock lock;
        private String taskId;
    
        public DemoTask(final Lock lock, final String taskId){
            this.lock = lock;
            this.taskId = taskId;
        }
    
        @Override
        public void run() {
            try {
                lock.lock();
                Thread.sleep(500);
                System.out.println(String.format("Thread %s Completed", taskId));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    并发锁核心类AQS 

    一、概念

    AQS 是 AbstractQueuedSynchronizer 的简称,AQS 是一个抽象的队列式同步器框架,提供了阻塞锁和 FIFO 队列实现同步操作。JUC 包中的同步类基本都是基于 AQS 同步器来实现的,如 ReentrantLock,Semaphore 等。

    二、原理

    1、AQS 工作机制:(三点)

    1. 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。

    2. 如果被请求的共享资源被占用,则将获取不到锁的线程加入到队列中。等到占有线程释放锁后唤醒队列中的任务争抢锁,这个队列为 CLH 队列。

    3. 使用state成员变量表示当前的同步状态,提供 getState,setState,compareAndSetState 进行操作。

    2、CLH 队列:

    虚拟的双向队列,底层是双向链表,包括head节点和tail结点,仅存在结点之间的关联关系。AQS将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

    并发锁核心类AQS学习笔记(超详细)

     

    3、AQS 对资源的共享方式

    AQS定义两种资源共享方式

    1. 独占 ( Exclusive ):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的,所以非公平锁效率较高

    1. 共享 ( Share ):多个线程可同时执行,如Semaphore、CountDownLatch。

    4、AQS 的设计模式

    AQS 同步器的设计是基于模板方法模式。使用者继承AbstractQueuedSynchronizer并重写指定的方法。实现对于共享资源state的获取和释放。

    将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。 AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用,自定义同步器时需要重写下面几个AQS提供的模板方法:

    isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

    以 ReentrantLock为 例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程在tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。

    三、空间结构

    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

    AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

    队列中Node的头结点

    private transient volatile Node head;    

    队列中Node的尾结点

    private transient volatile Node tail;  

    表示同步状态的成员变量,使用volatile修饰保证线程可见性

    private volatile int state;

    返回同步状态的当前值

    protected final int getState() {  
           return state;
    }

    设置同步状态的值

    protected final void setState(int newState) {
           state = newState;
    }

    原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)

    protected final boolean compareAndSetState(int expect, int update) {
           return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    自旋时间

    static final long spinForTimeoutThreshold = 1000L;

    Unsafe类实例

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    state内存偏移地址

    private static final long stateOffset;

    head内存偏移地址

    private static final long headOffset;

    tail内存偏移地址

    private static final long tailOffset;

    节点状态内存偏移地址

    private static final long waitStatusOffset;

    next内存偏移地址

    private static final long nextOffset;

    静态初始化块,用于加载内存偏移地址。

    static {
           try {
               stateOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
               headOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
               tailOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
               waitStatusOffset = unsafe.objectFieldOffset
                   (Node.class.getDeclaredField("waitStatus"));
               nextOffset = unsafe.objectFieldOffset
                   (Node.class.getDeclaredField("next"));
           } catch (Exception ex) { throw new Error(ex); }
    }

    类构造方法为从抽象构造方法,供子类调用。

    protected AbstractQueuedSynchronizer() { }    

    四、常用方法

    acquire

    该方法以独占模式获取资源,先尝试获取锁,如果获取失败则调用addWaiter将该线程加入队列中。

    源码如下:

    public final void acquire(int arg) {
       if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
    }

    由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下

    并发锁核心类AQS学习笔记(超详细)

     

    1. 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。

    2. 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。

    3. 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

    4. 由于tryAcquire默认实现是抛出异常,所以此时,不进行分析,之后会结合一个例子进行分析。

    addWaiter

    使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中。

    // 添加等待者
    private Node addWaiter(Node mode) {
       // 新生成一个结点,默认为独占模式
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       // 保存尾结点
       Node pred = tail;
       if (pred != null) { // 尾结点不为空,即已经被初始化
           // 将node结点的prev域连接到尾结点
           node.prev = pred;
           if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node
               // 设置尾结点的next域为node
               pred.next = node;
               return node; // 返回新生成的结点
           }
       }
       enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
       return node;
    }

    enq

    使用无限循环来确保节点的成功插入。

    private Node enq(final Node node) {
       for (;;) { // 无限循环,确保结点能够成功入队列
           // 保存尾结点
           Node t = tail;
           if (t == null) { // 尾结点为空,即还没被初始化
               if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
                   tail = head; // 头结点与尾结点都指向同一个新生结点
           } else { // 尾结点不为空,即已经被初始化过
               // 将node结点的prev域连接到尾结点
               node.prev = t;
               if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
                   // 设置尾结点的next域为node
                   t.next = node;
                   return t; // 返回尾结点
               }
           }
       }
    }

    acquireQueue

    首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取(资源),代表该当前节点能够占有锁,设置头结点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法

    // sync队列中的结点在独占且忽略中断的模式下获取(资源)
    final boolean acquireQueued(final Node node, int arg) {
       // 标志
       boolean failed = true;
       try {
           // 中断标志
           boolean interrupted = false;
           for (;;) { // 无限循环
               // 获取node节点的前驱结点
               final Node p = node.predecessor();
               if (p == head && tryAcquire(arg)) { // 前驱为头结点并且成功获得锁
                   setHead(node); // 设置头结点
                   p.next = null; // help GC
                   failed = false; // 设置标志
                   return interrupted;
               }
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
    }

    shouldParkAfterFailedAcquire和方法,首先,我们看

    shouldParkAfterFailedAcquire

    只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。

    // 当获取(资源)失败后,检查并且更新结点状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       // 获取前驱结点的状态
       int ws = pred.waitStatus;
       if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
           // 可以进行park操作
           return true;
       if (ws > 0) { // 表示状态为CANCELLED,为1
           do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
           // 赋值pred结点的next域
           pred.next = node;
       } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)
           // 比较并设置前驱结点的状态为SIGNAL
           compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       // 不能进行park操作
       return false;
    }

    parkAndCheckInterrupt

    首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断

    // 进行park操作并且返回该线程是否被中断
    private final boolean parkAndCheckInterrupt() {
       // 在许可可用之前禁用当前线程,并且设置了blocker
       LockSupport.park(this);
       return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
    }

    cancelAcquire

    该方法完成的功能就是取消当前线程对资源的获取,即设置该节点的状态为CANCELLED

    // 取消继续获取(资源)
    private void cancelAcquire(Node node) {
       // Ignore if node doesn't exist
       // node为空,返回
       if (node == null)
           return;
       // 设置node结点的thread为空
       node.thread = null;
       // Skip cancelled predecessors
       // 保存node的前驱结点
       Node pred = node.prev;
       while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
           node.prev = pred = pred.prev;
       // 获取pred结点的下一个结点
       Node predNext = pred.next;
       // 设置node结点的状态为CANCELLED
       node.waitStatus = Node.CANCELLED;
       // If we are the tail, remove ourselves.
       if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点
           // 比较并设置pred结点的next节点为null
           compareAndSetNext(pred, predNext, null);
       } else { // node结点不为尾结点,或者比较设置不成功
     
           int ws;
           if (pred != head &&
               ((ws = pred.waitStatus) == Node.SIGNAL ||
                   (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
               pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者
                                   // pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
               // 保存结点的后继
               Node next = node.next;
               if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
                   compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
           } else {
               unparkSuccessor(node); // 释放node的前一个结点
           }
           node.next = node; // help GC
       }
    }

    unparkSuccessor

    该方法的作用就是为了释放node节点的后继节点。

    // 释放后继结点
    private void unparkSuccessor(Node node) {
       // 获取node结点的等待状态
       int ws = node.waitStatus;
       if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
           // 比较并且设置结点等待状态,设置为0
           compareAndSetWaitStatus(node, ws, 0);
       // 获取node节点的下一个结点
       Node s = node.next;
       if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
           // s赋值为空
           s = null;
           // 从尾结点开始从后往前开始遍历
           for (Node t = tail; t != null && t != node; t = t.prev)
               if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
                   // 保存结点
                   s = t;
       }
       if (s != null) // 该结点不为为空,释放许可
           LockSupport.unpark(s.thread);
    }

    对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:

    并发锁核心类AQS学习笔记(超详细)

     

    其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。

    现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:

    1. 判断结点的前驱是否为head并且是否成功获取(资源)。

    2. 若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。

    3. 若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。

    4. 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。

    release

    以独占模式释放对象,其中 tryRelease 的默认实现是抛出异常,需要具体的子类实现,如果 tryRelease 成功,那么如果头结点不为空并且头结点的状态不为 0,则释放头结点的后继结点。

    public final boolean release(int arg) {
       if (tryRelease(arg)) { // 释放成功
           // 保存头结点
           Node h = head;
           if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0
               unparkSuccessor(h); //释放头结点的后继结点
           return true;
       }
       return false;
    }

    五、内部类

    Node类

    每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

    1. CANCELLED,值为1,表示当前的线程被取消。

    2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。

    3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。

    4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。

    5. 值为0,表示当前节点在sync queue中,等待着获取锁。

    static final class Node {
       // 模式,分为共享与独占
       // 共享模式
       static final Node SHARED = new Node();
       // 独占模式
       static final Node EXCLUSIVE = null;        
       // 结点状态
       // CANCELLED,值为1,表示当前的线程被取消
       // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
       // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
       // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
       // 值为0,表示当前节点在sync队列中,等待着获取锁
       static final int CANCELLED =  1;
       static final int SIGNAL    = -1;
       static final int CONDITION = -2;
       static final int PROPAGATE = -3;        
       // 结点状态
       volatile int waitStatus;        
       // 前驱结点
       volatile Node prev;    
       // 后继结点
       volatile Node next;        
       // 结点所对应的线程
       volatile Thread thread;        
       // 下一个等待者
       Node nextWaiter;
       
       // 结点是否在共享模式下等待
       final boolean isShared() {
           return nextWaiter == SHARED;
       }
       
       // 获取前驱结点,若前驱结点为空,抛出异常
       final Node predecessor() throws NullPointerException {
           // 保存前驱结点
           Node p = prev;
           if (p == null) // 前驱结点为空,抛出异常
               throw new NullPointerException();
           else // 前驱结点不为空,返回
               return p;
       }
       
       // 无参构造方法
       Node() {    // Used to establish initial head or SHARED marker
       }
       
       // 构造方法
           Node(Thread thread, Node mode) {    // Used by addWaiter
           this.nextWaiter = mode;
           this.thread = thread;
       }
       
       // 构造方法
       Node(Thread thread, int waitStatus) { // Used by Condition
           this.waitStatus = waitStatus;
           this.thread = thread;
       }
    }

    ConditionObject类

    // 内部类
    public class ConditionObject implements Condition, java.io.Serializable {
       // 版本号
       private static final long serialVersionUID = 1173984872572414699L;
       // condition队列的头结点
       private transient Node firstWaiter;
       // condition队列的尾结点
       private transient Node lastWaiter;
       // 构造方法
       public ConditionObject() { }
       // 添加新的waiter到wait队列
       private Node addConditionWaiter() {
           // 保存尾结点
           Node t = lastWaiter;
           // 尾结点不为空,并且尾结点的状态不为CONDITION
           if (t != null && t.waitStatus != Node.CONDITION) {
               // 清除状态为CONDITION的结点
               unlinkCancelledWaiters();
               // 将最后一个结点重新赋值给t
               t = lastWaiter;
           }
           // 新建一个结点
           Node node = new Node(Thread.currentThread(), Node.CONDITION);
           if (t == null) // 尾结点为空
               // 设置condition队列的头结点
               firstWaiter = node;
           else // 尾结点不为空
               // 设置为节点的nextWaiter域为node结点
               t.nextWaiter = node;
           // 更新condition队列的尾结点
           lastWaiter = node;
           return node;
       }
       private void doSignal(Node first) {
           // 循环
           do {
               if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空
                   // 设置尾结点为空
                   lastWaiter = null;
               // 设置first结点的nextWaiter域
               first.nextWaiter = null;
           } while (!transferForSignal(first) &&
                       (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
       }
       private void doSignalAll(Node first) {
           // condition队列的头结点尾结点都设置为空
           lastWaiter = firstWaiter = null;
           // 循环
           do {
               // 获取first结点的nextWaiter域结点
               Node next = first.nextWaiter;
               // 设置first结点的nextWaiter域为空
               first.nextWaiter = null;
               // 将first结点从condition队列转移到sync队列
               transferForSignal(first);
               // 重新设置first
               first = next;
           } while (first != null);
       }
       // 从condition队列中清除状态为CANCEL的结点
       private void unlinkCancelledWaiters() {
           // 保存condition队列头结点
           Node t = firstWaiter;
           Node trail = null;
           while (t != null) { // t不为空
               // 下一个结点
               Node next = t.nextWaiter;
               if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态
                   // 设置t节点的额nextWaiter域为空
                   t.nextWaiter = null;
                   if (trail == null) // trail为空
                       // 重新设置condition队列的头结点
                       firstWaiter = next;
                   else // trail不为空
                       // 设置trail结点的nextWaiter域为next结点
                       trail.nextWaiter = next;
                   if (next == null) // next结点为空
                       // 设置condition队列的尾结点
                       lastWaiter = trail;
               }
               else // t结点的状态为CONDTION状态
                   // 设置trail结点
                   trail = t;
               // 设置t结点
               t = next;
           }
       }
       // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
       public final void signal() {
           if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
               throw new IllegalMonitorStateException();
           // 保存condition队列头结点
           Node first = firstWaiter;
           if (first != null) // 头结点不为空
               // 唤醒一个等待线程
               doSignal(first);
       }
       // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
       public final void signalAll() {
           if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
               throw new IllegalMonitorStateException();
           // 保存condition队列头结点
           Node first = firstWaiter;
           if (first != null) // 头结点不为空
               // 唤醒所有等待线程
               doSignalAll(first);
       }
       // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
       public final void awaitUninterruptibly() {
           // 添加一个结点到等待队列
           Node node = addConditionWaiter();
           // 获取释放的状态
           int savedState = fullyRelease(node);
           boolean interrupted = false;
           while (!isOnSyncQueue(node)) { //
               // 阻塞当前线程
               LockSupport.park(this);
               if (Thread.interrupted()) // 当前线程被中断
                   // 设置interrupted状态
                   interrupted = true;
           }
           if (acquireQueued(node, savedState) || interrupted)
               selfInterrupt();
       }

       private static final int REINTERRUPT =  1;
       private static final int THROW_IE    = -1;
       private int checkInterruptWhileWaiting(Node node) {
           return Thread.interrupted() ?
               (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
               0;
       }
       private void reportInterruptAfterWait(int interruptMode)
           throws InterruptedException {
           if (interruptMode == THROW_IE)
               throw new InterruptedException();
           else if (interruptMode == REINTERRUPT)
               selfInterrupt();
       }
     
       // 等待,当前线程在接到信号或被中断之前一直处于等待状态
       public final void await() throws InterruptedException {
           if (Thread.interrupted()) // 当前线程被中断,抛出异常
               throw new InterruptedException();
           // 在wait队列上添加一个结点
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               // 阻塞当前线程
               LockSupport.park(this);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
                   break;
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null) // clean up if cancelled
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
       }
       // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
       public final long awaitNanos(long nanosTimeout)
               throws InterruptedException {
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           final long deadline = System.nanoTime() + nanosTimeout;
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               if (nanosTimeout <= 0L) {
                   transferAfterCancelledWait(node);
                   break;
               }
               if (nanosTimeout >= spinForTimeoutThreshold)
                   LockSupport.parkNanos(this, nanosTimeout);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
               nanosTimeout = deadline - System.nanoTime();
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null)
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
           return deadline - System.nanoTime();
       }
       // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
       public final boolean awaitUntil(Date deadline)
               throws InterruptedException {
           long abstime = deadline.getTime();
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           boolean timedout = false;
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               if (System.currentTimeMillis() > abstime) {
                   timedout = transferAfterCancelledWait(node);
                   break;
               }
               LockSupport.parkUntil(this, abstime);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null)
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
           return !timedout;
       }

       // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
       public final boolean await(long time, TimeUnit unit)
               throws InterruptedException {
           long nanosTimeout = unit.toNanos(time);
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           final long deadline = System.nanoTime() + nanosTimeout;
           boolean timedout = false;
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               if (nanosTimeout <= 0L) {
                   timedout = transferAfterCancelledWait(node);
                   break;
               }
               if (nanosTimeout >= spinForTimeoutThreshold)
                   LockSupport.parkNanos(this, nanosTimeout);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
               nanosTimeout = deadline - System.nanoTime();
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null)
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
           return !timedout;
       }
       final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
           return sync == AbstractQueuedSynchronizer.this;
       }
       //  查询是否有正在等待此条件的任何线程
       protected final boolean hasWaiters() {
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
               if (w.waitStatus == Node.CONDITION)
                   return true;
           }
           return false;
       }
       // 返回正在等待此条件的线程数估计值
       protected final int getWaitQueueLength() {
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           int n = 0;
           for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
               if (w.waitStatus == Node.CONDITION)
                   ++n;
           }
           return n;
       }
       // 返回包含那些可能正在等待此条件的线程集合
       protected final Collection<Thread> getWaitingThreads() {
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           ArrayList<Thread> list = new ArrayList<Thread>();
           for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
               if (w.waitStatus == Node.CONDITION) {
                   Thread t = w.thread;
                   if (t != null)
                       list.add(t);
               }
           }
           return list;
       }
    }

    此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

    public interface Condition {
       // 等待,当前线程在接到信号或被中断之前一直处于等待状态
       void await() throws InterruptedException;
       
       // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
       void awaitUninterruptibly();
       
       //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
       long awaitNanos(long nanosTimeout) throws InterruptedException;
       
       // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
       boolean await(long time, TimeUnit unit) throws InterruptedException;
       
       // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
       boolean awaitUntil(Date deadline) throws InterruptedException;
       
       // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
       void signal();
       
       // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
       void signalAll();
    }

    原文链接:https://blog.csdn.net/tc979907461/article/details/105979761

  • 相关阅读:
    电商交易背景知识合集第二季
    技术高手如何炼成
    #研发解决方案#基于Apriori算法的Nginx+Lua+ELK异常流量拦截方案
    电商交易背景知识合集第一季
    真刀真枪压测:基于TCPCopy的仿真压测方案
    安全基础教育第二季第1集:屡战屡败的找回密码
    #研发解决方案#从宏观到微观——天机与鹰眼联手
    挖坑和踩雷
    我们过去几年做对了哪些事
    小伙伴们手滑集
  • 原文地址:https://www.cnblogs.com/Chary/p/13395796.html
Copyright © 2011-2022 走看看