zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer 详解

    package java.util.concurrent.locks;

    基本介绍

    AbstractQueuedSynchronizer(队列同步器)可以看作是并发包(java.util.concurrent)的基础框架:
    
        JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等。
    
        AQS底层依靠CAS与同步队列。
    
    
    AbstractQueuedSynchronizer会把请求获取锁失败的线程放入一个队列的尾部:
    
        等待获取锁的线程全部处于阻塞状态。当前线程执行完毕(释放锁)后,会激活当前线程的后继节点。
    

    公平锁和非公平锁

    ReentranLock分为公平锁和非公平锁,二者的区别就在获取锁机会是否和排队顺序相关:
        如果锁被另一个线程持有,那么申请锁的其他线程会被挂起等待,加入等待队列。
        理论上,先调用lock()函数被挂起等待的线程应该排在等待队列的前端,后调用的就排在后边。
    
        如果此时,锁被释放,需要通知等待线程再次尝试获取锁:
            ReentranLock公平锁会让最先进入队列的线程获得锁。
            ReentranLock非公平锁则会让当前正在请求的线程插队获取锁,获取失败则放在队尾排队等待。
    

    模板模式

    AQS是一个抽象类,当构建一个同步组件的时候,需要定义一个子类继承AQS,应用了模板方法设计模式。
    
    模板模式由一个抽象类和一个实现类组成,抽象类中主要有三类方法:
    
        模板方法:实现了算法主体框架,供外部调用。里面会调用原语操作和钩子操作。
    
        原语操作:即定义的抽象方法,子类必须重写。
    
        钩子操作:和原语操作类似,也是供子类重写的,
            区别是钩子操作子类可以选择重写也可以选择不重写,如果不重写则使用抽象类默认操作,通常是一个空操作或抛出异常。
    

    AQS中可供子类重写的钩子操作

    方法名称 描述
    boolean tryAcquire(int arg) 独占式获取同步状态,成功返回true,失败返回false
    boolean tryRelease(int arg) 独占式释放同步状态,成功返回true,失败返回false
    int tryAcquireShared(int arg) 共享式获取同步状态,获取成功则返回值>=0
    boolean tryReleaseShared(int arg) 共享式释放同步状态,成功返回true,失败返回false
    boolean isHeldExclusively() 判断同步器是否在独占模式下被占用,一般用来表示同步器是否被当前线程占用

    模板方法

    子类重写相关钩子操作后,AQS中提供的模板方法才能正常调用。

    方法 描述
    void acquire(int arg) 独占式获取同步状态,该方法会调用子类重写的tryAcquire(int arg),如果tryAcquire返回true则该方法直接返回,否则先将当前线程加入同步队列的尾部,然后阻塞当前线程
    void acquireInterruptibly(int arg) 当线程获取同步状态失败被阻塞后,可以响应中断,收到中断后将会取消获取同步状态
    boolean tryAcquireNanos(int arg, long nanosTimeout) 在acquireInterruptibly的基础上加了超时限制,如果在超时时间内获取到同步状态返回true,否则返回false
    boolean release(int arg) 独占式释放同步状态,该方法会在释放同步状态后将第一个节点(对应刚刚释放同步状态的线程)的后继节点对应的线程唤醒。
    void acquireShared(int arg) 共享式获取同步状态,该方法会调用子类重写的tryAcquireShared(int arg),如果tryAcquireShared返回true则该方法直接返回,否则先将当前线程加入同步队列的尾部,然后阻塞当前线程
    void acquireSharedInterruptibly(int arg) 当线程获取同步状态失败被阻塞后,可以响应中断,收到中断后将会取消获取同步状态
    boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 在acquireSharedInterruptibly的基础上加了超时限制,如果在超时时间内获取到同步状态返回true,否则返回false
    boolean releaseShared(int arg) 共享式的释放同步状态

    源码分析

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    
        static final class Node {
    
            static final int CANCELLED =  1;    //当前节点被取消。
    
            static final int SIGNAL    = -1;    //表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark(唤醒)后继节点。
    
            static final int CONDITION = -2;    //表示当前节点在等待condition,即在condition队列中。
    
            static final int PROPAGATE = -3;    //表示releaseShared需要被传播给后续节点(仅在共享模式下使用)。
    
            //等待状态(默认0:无状态,表示当前节点在队列中等待获取锁。)
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            //当前节点代表的线程
            volatile Thread thread;
    
            Node nextWaiter;
    
    
            Node() {}
    
            Node(Node nextWaiter) {
                this.nextWaiter = nextWaiter;
                THREAD.set(this, Thread.currentThread());
            }
    
            Node(int waitStatus) {
                WAITSTATUS.set(this, waitStatus);
                THREAD.set(this, Thread.currentThread());
            }
        }
    
    
        private transient volatile Node head;
    
        private transient volatile Node tail;
    
        //独占变量state(通过CAS和volatile保证了线程安全问题)
        //state的值表示其状态:如果是0,那么当前还没有线程独占此变量;否在就是已经有线程独占了这个变量,也就是代表已经有线程获得了锁。
        private volatile int state;
    
    
        private void setHead(Node node) {
            head = node;
            node.thread = null;    //防止内存泄漏
            node.prev = null;
        }
    
        public final void acquire(int arg) {
            //尝试获取锁(AQS在无竞争条件下,不会new出head和tail节点。),失败则自旋,阻塞当前线程,重新获取锁,直到获取锁成功
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    //Node.EXCLUSIVE:标记指示节点在独占模式下等待
                selfInterrupt();
        }
    
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
    
        //ReentrantLock实现的公平锁
        static final class FairSync extends Sync {
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //获取AQS中的state变量
                int c = getState();
                //值为0,那么当前独占性变量还未被线程占有
                if (c == 0) {
                    //自旋获取锁(在没有其他等待时间更长的线程时:!hasQueuedPredecessors())
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        //将本线程设置为独占性变量所有者线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果该线程已经获取了锁,那么根据重入性原理,将state值进行加1,表示多次lock
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
    
        //通过调用addWaiter函数,AQS将当前线程加入到了等待队列尾部,但是还没有阻塞当前线程的执行
        private Node addWaiter(Node mode) {
            Node node = new Node(mode);
            //自旋,直到插入成功
            for (;;) {
                Node oldTail = tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail);
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        return node;
                    }
                } else {
                    initializeSyncQueue();    //如果没有头尾节点,建立空节点,并赋值给头尾节点
                }
            }
        }
    
        //获取锁失败,则插入队尾,并自旋,阻塞线程,重新获取锁
        //队列中的线程获取锁的条件(公平锁):上一个节点是头节点,并且成功tryAcquire(arg)
        final boolean acquireQueued(final Node node, int arg) {
            boolean interrupted = false;
            try {
                //如果当前线程成功获取锁,则将当前线程设置为头节点,否则一直自旋
                for (;;) {
                    //返回上一个节点,如果为空,则抛出nullpointerexception
                    final Node p = node.predecessor();
                    //如果上一个节点是头结点,并且当前线程获取到锁,将当前结点设置为头结点,并返回
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node))    //将当前线程节点的prev指向有效节点
                        interrupted |= parkAndCheckInterrupt();    //阻塞线程
                }
            } catch (Throwable t) {
                cancelAcquire(node);
                if (interrupted)
                    selfInterrupt();
                throw t;
            }
        }
    
    
        public final boolean release(int arg) {
            //尝试释放锁
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //唤醒head节点的其他节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
    
        //ReentrantLock实现的公平锁
        abstract static class Sync extends AbstractQueuedSynchronizer {
            //释放独占性变量,就是将status的值减1(0:完全释放锁)
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
        }
    
    
        //释放锁后需要唤醒其他线程
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
            Node s = node.next;
            //判断head节点的next节点是否为空或者是否是取消状态:如果是,则找其他节点。
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从队列尾部向前遍历找到最前面的一个waitStatus<=0的节点
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            if (s != null)
                //唤醒找到的节点线程
                LockSupport.unpark(s.thread);
        }
    }
    

  • 相关阅读:
    运输计划[二分答案 LCA 树上差分]
    树的重心与树的直径
    约瑟夫问题
    [The 2019 Asia Yinchuan First Round Online Programming] D Take Your Seat
    CF858F Wizard's Tour
    当那一天来临
    NOI2000 青蛙过河[递推]
    BZOJ4305 数列的GCD
    中国剩余定理和扩展中国剩余定理
    重写select
  • 原文地址:https://www.cnblogs.com/loveer/p/11539764.html
Copyright © 2011-2022 走看看