zoukankan      html  css  js  c++  java
  • AQS同步队列和条件队列

    static final class Node {
        //共享模式,资源可以同时去拿
        static final Node SHARED = new Node();
        //独占模式,只能有一个线程去拿
        static final Node EXCLUSIVE = null;
    
       //表示当前线程被中断了,在队列中没有任何意义,可以被剔除了
        static final int CANCELLED =  1;
        /**
         * 后继节点的线程处于等待状态,而当前节点如果释放了同步状态或者被取消,
         * 将会通知后继节点,使后继节点得以运行
         */
        static final int SIGNAL    = -1;
    
        /**
         * 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
         * 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
         */
        static final int CONDITION = -2;
        /**
         * 表示下一次共享方式同步状态获取将会被无条件的传播下去
         */
        static final int PROPAGATE = -3;
    
        /**
         * 标记当前节点的信号量状态(1,0,-1,-2,-3)5种状态
         * 使用CAS更改状态,volatile保证线程可见性,并发场景下,
         * 即被一个线程修改后,状态会立马让其他线程可见
         */
        volatile int waitStatus;
    
        /**
         * 前驱节点,当前节点加入到同步队列中被设置
         */
        volatile Node prev;
    
        /**
         * 后继节点
         */
        volatile Node next;
    
        /**
         * 节点同步状态的线程
         */
        volatile Thread thread;
    
        /**
         * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量
         * 也就是说节点类型(独占和共享)和等待队列中的后继节点公用一个字段
         * (用在条件队列里面)
         */
        Node nextWaiter;
        }
    

      CLH同步队列

    CLH 同步队列是一个 FIFO 双向队列,AQS 依赖它来完成同步状态的管理:

    • 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
    • 当同步状态释放时,会把首节点唤醒,使其再次尝试获取同步状态。

     

    state为0,表示可以竞争锁。

    state为1,表示无锁。可重入锁state可以++。

    例如:CountDownLatch,首先通过构造函数设置state = n,需要countDown()执行n次,await()才会返回。这里用到的就是state,每当执行一次countDown(),state就-1,知道所有的子线程执行完毕,state为0,await()方法就可以返回

    1、线程一和线程二cas竞争 

    2、线程二竞争失败,放入同步队列。调用locksupport.park阻塞。

    3、线程一执行成功释放锁,state置为0,唤醒线程二,重复1步骤。

    入队操作

    通过“自旋”也就是死循环的方式来保证该节点能顺利的加入到队列尾部,只有加入成功才会退出循环,否则会一直循序直到成功。 

    private Node addWaiter(Node mode) {
    // 以给定的模式来构建节点, mode有两种模式 
    //  共享式SHARED, 独占式EXCLUSIVE;
      Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速将该节点加入到队列的尾部
        Node pred = tail;
         if (pred != null) {
            node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 如果快速加入失败,则通过 anq方式入列
            enq(node);
            return node;
        }
    
    private Node enq(final Node node) {
    // CAS自旋,直到加入队尾成功        
    for (;;) {
        Node t = tail;
            if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为Hader节点,并将tail 指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
                } else {// 正常流程,加入队列尾部
                    node.prev = t;
                        if (compareAndSetTail(t, node)) {
                            t.next = node;
                            return t;
                    }
                }
            }
        }
    

      

    出队操作

    同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点 

    private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
    

      

    Condition条件队列

    public class ConditionObject implements Condition, java.io.Serializable {    
        /** First node of condition queue. */    
        private transient Node firstWaiter; // 头节点    
        /** Last node of condition queue. */    
        private transient Node lastWaiter; // 尾节点        
        public ConditionObject() {    }    // ... 省略内部代码
    }
    

      

    Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

    // ========== 阻塞 ==========   
    // 造成当前线程在接到信号或被中断之前一直处于等待状态。
    void await() throws InterruptedException; 
    // 造成当前线程在接到信号之前一直处于等待状态。
    void awaitUninterruptibly(); 
    // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,
    // 如果在`nanosTimeout` 之前唤醒,那么返回值 `= nanosTimeout - 消耗时间` ,如果返回值 `<= 0` ,
    //则可以认定它已经超时了。
    long awaitNanos(long nanosTimeout) throws InterruptedException; 
    // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    boolean await(long time, TimeUnit unit) throws InterruptedException; 
    // 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回 // true ,否则表示到了指定时间,返回返回 false 。
    boolean awaitUntil(Date deadline) throws InterruptedException; 
    // ========== 唤醒 ==========
    // 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。 pthread_cond_signal
    void signal(); 
    // 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
    void signalAll(); 
    

      

    例子:

    Condition.await()  CLH队列首部出队,入队condition队列尾部
    Condition.signal()  condition队列首部唤醒出队,入队CLH队列尾部

     

     入队

    public final void await() throws InterruptedException {
        // 当前线程中断
        if (Thread.interrupted())
            throw new InterruptedException();
        //当前线程加入等待队列
        Node node = addConditionWaiter();
        //释放锁
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        /**
         * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
         * 直到检测到此节点在同步队列上
         */
        while (!isOnSyncQueue(node)) {
            //线程挂起
            LockSupport.park(this);
            //如果已经中断了,则退出
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //竞争同步状态
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 清理下条件队列中的不是在等待条件的节点
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

      

    private Node addConditionWaiter() {
        Node t = lastWaiter;    //尾节点
        //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
        if (t != null && t.waitStatus != Node.CONDITION) {
            //清除条件队列中所有状态不为Condition的节点
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //当前线程新建节点,状态 CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        /**
         * 将该节点加入到条件队列中最后一个位置
         */
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

      

    出队

    调用 ConditionObject的 #signal() 方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。 

    public final void signal() {
        //检测当前线程是否为拥有锁的独
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //头节点,唤醒条件队列中的第一个节点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);    //唤醒
    }
    
    private void doSignal(Node first) {
        do {
            //修改头结点,完成旧头结点的移出工作
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }
    
     final boolean transferForSignal(Node node) {
        //将该节点从状态CONDITION改变为初始状态0,
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        //将节点加入到CLH 同步队列中去,返回的是CLH 同步队列中node节点前面的一个节点
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

      

     实现一个AQS所需要实现的方法

    // 互斥模式下尝试获取锁
    protected boolean tryAcquire(int arg) {
       throw new UnsupportedOperationException();
    }
    // 互斥模式下尝试释放锁
    protected boolean tryRelease(int arg) {
       throw new UnsupportedOperationException();
    }
    // 共享模式下尝试获取锁
    protected int tryAcquireShared(int arg) {
       throw new UnsupportedOperationException();
    }
    // 共享模式下尝试释放锁
    protected boolean tryReleaseShared(int arg) {
       throw new UnsupportedOperationException();
    }
    // 如果当前线程独占着锁,返回true
    protected boolean isHeldExclusively() {
       throw new UnsupportedOperationException();
    }
    

      这里用到了一种设计模式,即模板方法模式,自定义同步器时只需要重写上面几个方法即可,AQS中其他类都是final类型的,只有这几个方法能被其它类使用。那么重写了几个方法为什么可以实现同步器呢?这是因为AQS父类已经帮我买写好了一系列操作,包括入队,出队等。

  • 相关阅读:
    105.UDP通信实现广播
    104.tcp多线程读写实现群聊
    103.tcp通信实现远程控制
    102.tcp实现多线程连接与群聊
    101.自动注入
    100.dll调用
    99.遍历进程并直接写入内存
    98.TCP通信传输文件
    97.TCP通信
    96.udp通信
  • 原文地址:https://www.cnblogs.com/chenfx/p/15349107.html
Copyright © 2011-2022 走看看