zoukankan      html  css  js  c++  java
  • 算法:CLH锁的原理及实现&并发锁核心类AQS

    一、背景
    1.1 SMP(Symmetric Multi-Processor)
    对称多处理器结构,它是相对非对称多处理技术而言的、应用十分广泛的并行技术。在这种架构中,一台计算机由多个CPU组成,并共享内存和其他资源,所有的CPU都可以平等地访问内存、I/O和外部中断。虽然同时使用多个CPU,但是从管理的角度来看,它们的表现就像一台单机一样。操作系统将任务队列对称地分布于多个CPU之上,从而极大地提高了整个系统的数据处理能力。但是随着CPU数量的增加,每个CPU都要访问相同的内存资源,共享资源可能会成为系统瓶颈,导致CPU资源浪费。
    SMP:对称多处理器结构
    1.2 NUMA(Non-Uniform Memory Access)
    非一致存储访问,将CPU分为CPU模块,每个CPU模块由多个CPU组成,并且具有独立的本地内存、I/O槽口等,模块之间可以通过互联模块相互访问,访问本地内存(本CPU模块的内存)的速度将远远高于访问远地内存(其他CPU模块的内存)的速度,这也是非一致存储访问的由来。NUMA较好地解决SMP的扩展问题,当CPU数量增加时,因为访问远地内存的延时远远超过本地内存,系统性能无法线性增加。
    NUMA:非一致存储访问

    1.3 CLH、MCS命名来源

    • MCS:John Mellor-Crummey and Michael Scott
    • CLH:Craig,Landin andHagersten

    二、CLH锁
    CLH是一种基于单向链表的高性能、公平的自旋锁。申请加锁的线程通过前驱节点的变量进行自旋。在前置节点解锁后,当前节点会结束自旋,并进行加锁。在SMP架构下,CLH更具有优势。在NUMA架构下,如果当前节点与前驱节点不在同一CPU模块下,跨CPU模块会带来额外的系统开销,而MCS锁更适用于NUMA架构。
    在这里插入图片描述

    锁值:我把自旋条件定义为锁值 locked。locked == true 表示节点的处于加锁状态或者等待加锁状态,locked == false 表示节点处于解锁状态。

    1. 基于线程当前节点的前置节点的锁值(locked)进行自旋,locked == true 自旋,locked == false 加锁成功。
    2. locked == true 表示节点处于加锁状态或者等待加锁状态。
    3. locked == false 表示节点处于解锁状态。
    4. 每个节点在解锁时更新自己的锁值(locked),在这一时刻,该节点的后置节点会结束自旋,并进行加锁。

    2.1 加锁逻辑

    1. 获取当前线程的锁节点,如果为空则进行初始化。
    2. 通过同步方法获取尾节点,并将当前节点置为尾节点,此时获取到的尾节点为当前节点的前驱节点。
    3. 如果尾节点为空,则表示当前节点为第一个节点,加锁成功。
    4. 如果尾节点不为空,则基于前驱节点的锁值(locked==true)进行自旋,直到前驱节点的锁值 locked == false。

    2.2 解锁逻辑

    1. 获取当前线程的锁节点,如果节点为空或者锁值(locked== false)则无需解锁,直接返回。
    2. 使用同步方法为尾节点赋空值,赋值不成功则表示当前节点不是尾节点,需要将当前节点的 locked == false 已保证解锁该节点。如果当前节点为尾节点,则无需设置该节点的锁值。因为该节点没有后置节点,即使设置了,也没有实际意义。

    2.3 Java代码

    package org.learn.lock;
    
    import java.util.concurrent.atomic.AtomicReference;
    
    /**
     * MCS:John Mellor-Crummey and Michael Scott
     * CLH:Craig,Landin and Hagersten
     * @author zhibo
     * @version 1.0
     * @date 2018/11/7 10:39
     */
    public class CLHLock implements Lock {
        private AtomicReference<CLHNode> tail;
        private ThreadLocal<CLHNode> threadLocal;
    
        public CLHLock() {
            this.tail = new AtomicReference<>();
            this.threadLocal = new ThreadLocal<>();
        }
    
        @Override
        public void lock() {
            CLHNode curNode = threadLocal.get();
            if(curNode == null){
                curNode = new CLHNode();
                threadLocal.set(curNode);
            }
    
            CLHNode predNode = tail.getAndSet(curNode);
            if(predNode != null){
                while (predNode.getLocked()){
    
                }
            }
        }
    
        @Override
        public void unlock() {
            CLHNode curNode = threadLocal.get();
            threadLocal.remove();
    
            if(curNode == null || curNode.getLocked() == false){
                return;
            }
    
            if(!tail.compareAndSet(curNode, null)){
                curNode.setLocked(false);
            }
        }
    
        public static void main(String[] args) {
            final Lock clhLock = new CLHLock();
    
            for (int i = 0; i < 10; i++) {
                new Thread(new DemoTask(clhLock, i + "")).start();
            }
        }
    
        class CLHNode {
            private volatile boolean locked = true;
    
            public boolean getLocked() {
                return locked;
            }
    
            public void setLocked(boolean locked) {
                this.locked = locked;
            }
        }
    }
    
    package org.learn.lock;
    
    /**
     * @author zhibo
     * @version 1.0
     * @date 2018/11/7 14:22
     */
    public class DemoTask implements Runnable {
        private Lock lock;
        private String taskId;
    
        public DemoTask(final Lock lock, final String taskId){
            this.lock = lock;
            this.taskId = taskId;
        }
    
        @Override
        public void run() {
            try {
                lock.lock();
                Thread.sleep(500);
                System.out.println(String.format("Thread %s Completed", taskId));
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    并发锁核心类AQS 

    一、概念

    AQS 是 AbstractQueuedSynchronizer 的简称,AQS 是一个抽象的队列式同步器框架,提供了阻塞锁和 FIFO 队列实现同步操作。JUC 包中的同步类基本都是基于 AQS 同步器来实现的,如 ReentrantLock,Semaphore 等。

    二、原理

    1、AQS 工作机制:(三点)

    1. 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。

    2. 如果被请求的共享资源被占用,则将获取不到锁的线程加入到队列中。等到占有线程释放锁后唤醒队列中的任务争抢锁,这个队列为 CLH 队列。

    3. 使用state成员变量表示当前的同步状态,提供 getState,setState,compareAndSetState 进行操作。

    2、CLH 队列:

    虚拟的双向队列,底层是双向链表,包括head节点和tail结点,仅存在结点之间的关联关系。AQS将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

    并发锁核心类AQS学习笔记(超详细)

     

    3、AQS 对资源的共享方式

    AQS定义两种资源共享方式

    1. 独占 ( Exclusive ):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:

    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁

    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的,所以非公平锁效率较高

    1. 共享 ( Share ):多个线程可同时执行,如Semaphore、CountDownLatch。

    4、AQS 的设计模式

    AQS 同步器的设计是基于模板方法模式。使用者继承AbstractQueuedSynchronizer并重写指定的方法。实现对于共享资源state的获取和释放。

    将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。 AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用,自定义同步器时需要重写下面几个AQS提供的模板方法:

    isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

    以 ReentrantLock为 例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程在tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。

    三、空间结构

    AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

    AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

    队列中Node的头结点

    private transient volatile Node head;    

    队列中Node的尾结点

    private transient volatile Node tail;  

    表示同步状态的成员变量,使用volatile修饰保证线程可见性

    private volatile int state;

    返回同步状态的当前值

    protected final int getState() {  
           return state;
    }

    设置同步状态的值

    protected final void setState(int newState) {
           state = newState;
    }

    原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)

    protected final boolean compareAndSetState(int expect, int update) {
           return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    自旋时间

    static final long spinForTimeoutThreshold = 1000L;

    Unsafe类实例

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    state内存偏移地址

    private static final long stateOffset;

    head内存偏移地址

    private static final long headOffset;

    tail内存偏移地址

    private static final long tailOffset;

    节点状态内存偏移地址

    private static final long waitStatusOffset;

    next内存偏移地址

    private static final long nextOffset;

    静态初始化块,用于加载内存偏移地址。

    static {
           try {
               stateOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
               headOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
               tailOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
               waitStatusOffset = unsafe.objectFieldOffset
                   (Node.class.getDeclaredField("waitStatus"));
               nextOffset = unsafe.objectFieldOffset
                   (Node.class.getDeclaredField("next"));
           } catch (Exception ex) { throw new Error(ex); }
    }

    类构造方法为从抽象构造方法,供子类调用。

    protected AbstractQueuedSynchronizer() { }    

    四、常用方法

    acquire

    该方法以独占模式获取资源,先尝试获取锁,如果获取失败则调用addWaiter将该线程加入队列中。

    源码如下:

    public final void acquire(int arg) {
       if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
    }

    由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下

    并发锁核心类AQS学习笔记(超详细)

     

    1. 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中默认会抛出一个异常,即需要子类去重写此方法完成自己的逻辑。之后会进行分析。

    2. 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。

    3. 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。

    4. 由于tryAcquire默认实现是抛出异常,所以此时,不进行分析,之后会结合一个例子进行分析。

    addWaiter

    使用快速添加的方式往sync queue尾部添加结点,如果sync queue队列还没有初始化,则会使用enq插入队列中。

    // 添加等待者
    private Node addWaiter(Node mode) {
       // 新生成一个结点,默认为独占模式
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       // 保存尾结点
       Node pred = tail;
       if (pred != null) { // 尾结点不为空,即已经被初始化
           // 将node结点的prev域连接到尾结点
           node.prev = pred;
           if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node
               // 设置尾结点的next域为node
               pred.next = node;
               return node; // 返回新生成的结点
           }
       }
       enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
       return node;
    }

    enq

    使用无限循环来确保节点的成功插入。

    private Node enq(final Node node) {
       for (;;) { // 无限循环,确保结点能够成功入队列
           // 保存尾结点
           Node t = tail;
           if (t == null) { // 尾结点为空,即还没被初始化
               if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
                   tail = head; // 头结点与尾结点都指向同一个新生结点
           } else { // 尾结点不为空,即已经被初始化过
               // 将node结点的prev域连接到尾结点
               node.prev = t;
               if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
                   // 设置尾结点的next域为node
                   t.next = node;
                   return t; // 返回尾结点
               }
           }
       }
    }

    acquireQueue

    首先获取当前节点的前驱节点,如果前驱节点是头结点并且能够获取(资源),代表该当前节点能够占有锁,设置头结点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法

    // sync队列中的结点在独占且忽略中断的模式下获取(资源)
    final boolean acquireQueued(final Node node, int arg) {
       // 标志
       boolean failed = true;
       try {
           // 中断标志
           boolean interrupted = false;
           for (;;) { // 无限循环
               // 获取node节点的前驱结点
               final Node p = node.predecessor();
               if (p == head && tryAcquire(arg)) { // 前驱为头结点并且成功获得锁
                   setHead(node); // 设置头结点
                   p.next = null; // help GC
                   failed = false; // 设置标志
                   return interrupted;
               }
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
    }

    shouldParkAfterFailedAcquire和方法,首先,我们看

    shouldParkAfterFailedAcquire

    只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。

    // 当获取(资源)失败后,检查并且更新结点状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       // 获取前驱结点的状态
       int ws = pred.waitStatus;
       if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
           // 可以进行park操作
           return true;
       if (ws > 0) { // 表示状态为CANCELLED,为1
           do {
               node.prev = pred = pred.prev;
           } while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
           // 赋值pred结点的next域
           pred.next = node;
       } else { // 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)
           // 比较并设置前驱结点的状态为SIGNAL
           compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
       }
       // 不能进行park操作
       return false;
    }

    parkAndCheckInterrupt

    首先执行park操作,即禁用当前线程,然后返回该线程是否已经被中断

    // 进行park操作并且返回该线程是否被中断
    private final boolean parkAndCheckInterrupt() {
       // 在许可可用之前禁用当前线程,并且设置了blocker
       LockSupport.park(this);
       return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
    }

    cancelAcquire

    该方法完成的功能就是取消当前线程对资源的获取,即设置该节点的状态为CANCELLED

    // 取消继续获取(资源)
    private void cancelAcquire(Node node) {
       // Ignore if node doesn't exist
       // node为空,返回
       if (node == null)
           return;
       // 设置node结点的thread为空
       node.thread = null;
       // Skip cancelled predecessors
       // 保存node的前驱结点
       Node pred = node.prev;
       while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
           node.prev = pred = pred.prev;
       // 获取pred结点的下一个结点
       Node predNext = pred.next;
       // 设置node结点的状态为CANCELLED
       node.waitStatus = Node.CANCELLED;
       // If we are the tail, remove ourselves.
       if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点
           // 比较并设置pred结点的next节点为null
           compareAndSetNext(pred, predNext, null);
       } else { // node结点不为尾结点,或者比较设置不成功
     
           int ws;
           if (pred != head &&
               ((ws = pred.waitStatus) == Node.SIGNAL ||
                   (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
               pred.thread != null) { // (pred结点不为头结点,并且pred结点的状态为SIGNAL)或者
                                   // pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
               // 保存结点的后继
               Node next = node.next;
               if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
                   compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
           } else {
               unparkSuccessor(node); // 释放node的前一个结点
           }
           node.next = node; // help GC
       }
    }

    unparkSuccessor

    该方法的作用就是为了释放node节点的后继节点。

    // 释放后继结点
    private void unparkSuccessor(Node node) {
       // 获取node结点的等待状态
       int ws = node.waitStatus;
       if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
           // 比较并且设置结点等待状态,设置为0
           compareAndSetWaitStatus(node, ws, 0);
       // 获取node节点的下一个结点
       Node s = node.next;
       if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
           // s赋值为空
           s = null;
           // 从尾结点开始从后往前开始遍历
           for (Node t = tail; t != null && t != node; t = t.prev)
               if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
                   // 保存结点
                   s = t;
       }
       if (s != null) // 该结点不为为空,释放许可
           LockSupport.unpark(s.thread);
    }

    对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:

    并发锁核心类AQS学习笔记(超详细)

     

    其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。

    现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:

    1. 判断结点的前驱是否为head并且是否成功获取(资源)。

    2. 若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。

    3. 若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。

    4. 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。

    release

    以独占模式释放对象,其中 tryRelease 的默认实现是抛出异常,需要具体的子类实现,如果 tryRelease 成功,那么如果头结点不为空并且头结点的状态不为 0,则释放头结点的后继结点。

    public final boolean release(int arg) {
       if (tryRelease(arg)) { // 释放成功
           // 保存头结点
           Node h = head;
           if (h != null && h.waitStatus != 0) // 头结点不为空并且头结点状态不为0
               unparkSuccessor(h); //释放头结点的后继结点
           return true;
       }
       return false;
    }

    五、内部类

    Node类

    每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

    1. CANCELLED,值为1,表示当前的线程被取消。

    2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。

    3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。

    4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。

    5. 值为0,表示当前节点在sync queue中,等待着获取锁。

    static final class Node {
       // 模式,分为共享与独占
       // 共享模式
       static final Node SHARED = new Node();
       // 独占模式
       static final Node EXCLUSIVE = null;        
       // 结点状态
       // CANCELLED,值为1,表示当前的线程被取消
       // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
       // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
       // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
       // 值为0,表示当前节点在sync队列中,等待着获取锁
       static final int CANCELLED =  1;
       static final int SIGNAL    = -1;
       static final int CONDITION = -2;
       static final int PROPAGATE = -3;        
       // 结点状态
       volatile int waitStatus;        
       // 前驱结点
       volatile Node prev;    
       // 后继结点
       volatile Node next;        
       // 结点所对应的线程
       volatile Thread thread;        
       // 下一个等待者
       Node nextWaiter;
       
       // 结点是否在共享模式下等待
       final boolean isShared() {
           return nextWaiter == SHARED;
       }
       
       // 获取前驱结点,若前驱结点为空,抛出异常
       final Node predecessor() throws NullPointerException {
           // 保存前驱结点
           Node p = prev;
           if (p == null) // 前驱结点为空,抛出异常
               throw new NullPointerException();
           else // 前驱结点不为空,返回
               return p;
       }
       
       // 无参构造方法
       Node() {    // Used to establish initial head or SHARED marker
       }
       
       // 构造方法
           Node(Thread thread, Node mode) {    // Used by addWaiter
           this.nextWaiter = mode;
           this.thread = thread;
       }
       
       // 构造方法
       Node(Thread thread, int waitStatus) { // Used by Condition
           this.waitStatus = waitStatus;
           this.thread = thread;
       }
    }

    ConditionObject类

    // 内部类
    public class ConditionObject implements Condition, java.io.Serializable {
       // 版本号
       private static final long serialVersionUID = 1173984872572414699L;
       // condition队列的头结点
       private transient Node firstWaiter;
       // condition队列的尾结点
       private transient Node lastWaiter;
       // 构造方法
       public ConditionObject() { }
       // 添加新的waiter到wait队列
       private Node addConditionWaiter() {
           // 保存尾结点
           Node t = lastWaiter;
           // 尾结点不为空,并且尾结点的状态不为CONDITION
           if (t != null && t.waitStatus != Node.CONDITION) {
               // 清除状态为CONDITION的结点
               unlinkCancelledWaiters();
               // 将最后一个结点重新赋值给t
               t = lastWaiter;
           }
           // 新建一个结点
           Node node = new Node(Thread.currentThread(), Node.CONDITION);
           if (t == null) // 尾结点为空
               // 设置condition队列的头结点
               firstWaiter = node;
           else // 尾结点不为空
               // 设置为节点的nextWaiter域为node结点
               t.nextWaiter = node;
           // 更新condition队列的尾结点
           lastWaiter = node;
           return node;
       }
       private void doSignal(Node first) {
           // 循环
           do {
               if ( (firstWaiter = first.nextWaiter) == null) // 该节点的nextWaiter为空
                   // 设置尾结点为空
                   lastWaiter = null;
               // 设置first结点的nextWaiter域
               first.nextWaiter = null;
           } while (!transferForSignal(first) &&
                       (first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
       }
       private void doSignalAll(Node first) {
           // condition队列的头结点尾结点都设置为空
           lastWaiter = firstWaiter = null;
           // 循环
           do {
               // 获取first结点的nextWaiter域结点
               Node next = first.nextWaiter;
               // 设置first结点的nextWaiter域为空
               first.nextWaiter = null;
               // 将first结点从condition队列转移到sync队列
               transferForSignal(first);
               // 重新设置first
               first = next;
           } while (first != null);
       }
       // 从condition队列中清除状态为CANCEL的结点
       private void unlinkCancelledWaiters() {
           // 保存condition队列头结点
           Node t = firstWaiter;
           Node trail = null;
           while (t != null) { // t不为空
               // 下一个结点
               Node next = t.nextWaiter;
               if (t.waitStatus != Node.CONDITION) { // t结点的状态不为CONDTION状态
                   // 设置t节点的额nextWaiter域为空
                   t.nextWaiter = null;
                   if (trail == null) // trail为空
                       // 重新设置condition队列的头结点
                       firstWaiter = next;
                   else // trail不为空
                       // 设置trail结点的nextWaiter域为next结点
                       trail.nextWaiter = next;
                   if (next == null) // next结点为空
                       // 设置condition队列的尾结点
                       lastWaiter = trail;
               }
               else // t结点的状态为CONDTION状态
                   // 设置trail结点
                   trail = t;
               // 设置t结点
               t = next;
           }
       }
       // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
       public final void signal() {
           if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
               throw new IllegalMonitorStateException();
           // 保存condition队列头结点
           Node first = firstWaiter;
           if (first != null) // 头结点不为空
               // 唤醒一个等待线程
               doSignal(first);
       }
       // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
       public final void signalAll() {
           if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
               throw new IllegalMonitorStateException();
           // 保存condition队列头结点
           Node first = firstWaiter;
           if (first != null) // 头结点不为空
               // 唤醒所有等待线程
               doSignalAll(first);
       }
       // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
       public final void awaitUninterruptibly() {
           // 添加一个结点到等待队列
           Node node = addConditionWaiter();
           // 获取释放的状态
           int savedState = fullyRelease(node);
           boolean interrupted = false;
           while (!isOnSyncQueue(node)) { //
               // 阻塞当前线程
               LockSupport.park(this);
               if (Thread.interrupted()) // 当前线程被中断
                   // 设置interrupted状态
                   interrupted = true;
           }
           if (acquireQueued(node, savedState) || interrupted)
               selfInterrupt();
       }

       private static final int REINTERRUPT =  1;
       private static final int THROW_IE    = -1;
       private int checkInterruptWhileWaiting(Node node) {
           return Thread.interrupted() ?
               (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
               0;
       }
       private void reportInterruptAfterWait(int interruptMode)
           throws InterruptedException {
           if (interruptMode == THROW_IE)
               throw new InterruptedException();
           else if (interruptMode == REINTERRUPT)
               selfInterrupt();
       }
     
       // 等待,当前线程在接到信号或被中断之前一直处于等待状态
       public final void await() throws InterruptedException {
           if (Thread.interrupted()) // 当前线程被中断,抛出异常
               throw new InterruptedException();
           // 在wait队列上添加一个结点
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               // 阻塞当前线程
               LockSupport.park(this);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
                   break;
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null) // clean up if cancelled
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
       }
       // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
       public final long awaitNanos(long nanosTimeout)
               throws InterruptedException {
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           final long deadline = System.nanoTime() + nanosTimeout;
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               if (nanosTimeout <= 0L) {
                   transferAfterCancelledWait(node);
                   break;
               }
               if (nanosTimeout >= spinForTimeoutThreshold)
                   LockSupport.parkNanos(this, nanosTimeout);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
               nanosTimeout = deadline - System.nanoTime();
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null)
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
           return deadline - System.nanoTime();
       }
       // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
       public final boolean awaitUntil(Date deadline)
               throws InterruptedException {
           long abstime = deadline.getTime();
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           boolean timedout = false;
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               if (System.currentTimeMillis() > abstime) {
                   timedout = transferAfterCancelledWait(node);
                   break;
               }
               LockSupport.parkUntil(this, abstime);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null)
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
           return !timedout;
       }

       // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
       public final boolean await(long time, TimeUnit unit)
               throws InterruptedException {
           long nanosTimeout = unit.toNanos(time);
           if (Thread.interrupted())
               throw new InterruptedException();
           Node node = addConditionWaiter();
           int savedState = fullyRelease(node);
           final long deadline = System.nanoTime() + nanosTimeout;
           boolean timedout = false;
           int interruptMode = 0;
           while (!isOnSyncQueue(node)) {
               if (nanosTimeout <= 0L) {
                   timedout = transferAfterCancelledWait(node);
                   break;
               }
               if (nanosTimeout >= spinForTimeoutThreshold)
                   LockSupport.parkNanos(this, nanosTimeout);
               if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                   break;
               nanosTimeout = deadline - System.nanoTime();
           }
           if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               interruptMode = REINTERRUPT;
           if (node.nextWaiter != null)
               unlinkCancelledWaiters();
           if (interruptMode != 0)
               reportInterruptAfterWait(interruptMode);
           return !timedout;
       }
       final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
           return sync == AbstractQueuedSynchronizer.this;
       }
       //  查询是否有正在等待此条件的任何线程
       protected final boolean hasWaiters() {
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
               if (w.waitStatus == Node.CONDITION)
                   return true;
           }
           return false;
       }
       // 返回正在等待此条件的线程数估计值
       protected final int getWaitQueueLength() {
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           int n = 0;
           for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
               if (w.waitStatus == Node.CONDITION)
                   ++n;
           }
           return n;
       }
       // 返回包含那些可能正在等待此条件的线程集合
       protected final Collection<Thread> getWaitingThreads() {
           if (!isHeldExclusively())
               throw new IllegalMonitorStateException();
           ArrayList<Thread> list = new ArrayList<Thread>();
           for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
               if (w.waitStatus == Node.CONDITION) {
                   Thread t = w.thread;
                   if (t != null)
                       list.add(t);
               }
           }
           return list;
       }
    }

    此类实现了Condition接口,Condition接口定义了条件操作规范,具体如下

    public interface Condition {
       // 等待,当前线程在接到信号或被中断之前一直处于等待状态
       void await() throws InterruptedException;
       
       // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
       void awaitUninterruptibly();
       
       //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
       long awaitNanos(long nanosTimeout) throws InterruptedException;
       
       // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
       boolean await(long time, TimeUnit unit) throws InterruptedException;
       
       // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
       boolean awaitUntil(Date deadline) throws InterruptedException;
       
       // 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
       void signal();
       
       // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
       void signalAll();
    }

    原文链接:https://blog.csdn.net/tc979907461/article/details/105979761

  • 相关阅读:
    _ 下划线 Underscores __init__
    Page not found (404) 不被Django的exception中间件捕捉 中间件
    从装修儿童房时的门锁说起
    欧拉定理 费马小定理的推广
    线性运算 非线性运算
    Optimistic concurrency control 死锁 悲观锁 乐观锁 自旋锁
    Avoiding Full Table Scans
    批量的单向的ssh 认证
    批量的单向的ssh 认证
    Corrupted MAC on input at /usr/local/perl/lib/site_perl/5.22.1/x86_64-linux/Net/SSH/Perl/Packet.pm l
  • 原文地址:https://www.cnblogs.com/Chary/p/13395796.html
Copyright © 2011-2022 走看看