zoukankan      html  css  js  c++  java
  • AbstractQueuedSynchronizer类(AQS)

    AbstractQueuedSynchronizer抽象同步队列是一个抽象类,简称AQS,是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的

    AQS的数据结构:逻辑结构:双向队列,存储结构:链式存储,即包含头尾指针head、tail及节点Node。Node结构体包含前驱节点prev、后继节点next及data

    一、Node(静态内部类)

    1、变量与构造方法

            //标记线程是请求共享资源时被阻塞挂起后放入AQS队列的
            static final Node SHARED = new Node();
            //标记线程是请求独占资源时被挂起后放入AQS队列的
            static final Node EXCLUSIVE = null;
    
            //waitStatus == 1:表示线程已取消
            static final int CANCELLED =  1;
            //waitStatus == -1:线程阻塞(park)需要被唤醒(unpark)
            static final int SIGNAL    = -1;
            //waitStatus == -2:线程在条件队列中等待
            static final int CONDITION = -2;
            //waitStatus == -3释放共享资源时需要通知其他节点
            static final int PROPAGATE = -3;
    
            //状态
            volatile int waitStatus;
    
            //前一个节点
            volatile Node prev;
    
            //
            volatile Node next;
    
            //后一个节点
            volatile Thread thread;
    
            //
            Node nextWaiter;
    
            Node() {    //无参构造方法  创建头结点或者共享模型节点
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter 等待队列节点
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition 条件队列节点
                this.waitStatus = waitStatus;
                this.thread = thread;
            }

    Node根据构造方法可分为三类

    Node():头节点和共享模式mode类型Node.SHARED(单例)

    Node(Thread thread, Node mode):AQS中的阻塞队列中的节点,mode是类型——仅为共享模式节点Node.SHARED及独占模式Node.EXCLUSIVE,waitStatus != -2(Node.CONDITION)

    Node(Thread thread, int waitStatus):ConditionObject中条件队列中的的节点,此时mode == null(Node.EXCLUSIVE),waitStatus == -2(Node.CONDITION)

    2.方法;只有两个方法

            /**
             * Returns true if node is waiting in shared mode.
             */判断是否是共享模式下的节点
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * Returns previous node, or throws NullPointerException if null.
             * Use when predecessor cannot be null.  The null check could
             * be elided, but is present to help the VM.
             * 返回pre 为空时空指针异常
             * @return the predecessor of this node
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }

    二、AQS

    1.变量和构造方法

        //双链表头节点  延迟初始化  只能setHead修改 头结点waitStatus != Node.CANCELLED
        private transient volatile Node head;
    
        //双链表尾节点  延迟初始化 只能enq()入队时修改
        private transient volatile Node tail;
    
        //The synchronization state.同步锁状态
        private volatile int state;
    
        private static final Unsafe unsafe = Unsafe.getUnsafe();
        private static final long stateOffset;
        private static final long headOffset;
        private static final long tailOffset;
        private static final long waitStatusOffset;
        private static final long nextOffset;
    
        static {
            try {
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        protected AbstractQueuedSynchronizer() { }

    2.部分重要的方法

    1).Node enq(Node node):入队方法,头尾节点的初始化;及尾插法建立链表入队。是private方法,有封装方法

        /**
         * 节点入队,包含头尾节点初始化
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;//初始化,tail == head == new Node();
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;//标准的尾插法建立链表
                    }
                }
            }
        }

    2. Node addWaiter(Node node):enq方法的封装,是private方法,封装方法分为两类:共享模式,独占模式

        /**
         * Creates and enqueues node for current thread and given mode.
         * AQS阻塞队列 mode区分共享资源与独占资源
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 尾节点不为空时,初次入队尝试跳过enq方法直接入队,如果CAS失败,调用enq入队,enq方法会无限重试CAS for(;;)
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

    3.共享资源模式主要方法:

    void  acquireShared(int arg):请求资源,已经包含了释放资源的代码(doReleaseShared)

        public final void acquireShared(int arg) {
            //共享模式下尝试请求资源,成功直接返回,失败入AQS阻塞队列(tryAcquireShared是个空方法,设计模式:模板模式,具体实现延迟到子类)
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
        private void doAcquireShared(int arg) {
            //创建共享模式Node节点插入阻塞队列,并返回Node节点
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        //这里其实是释放资源时逻辑,doReleaseShared()释放资源时,会选择unpark第二个节点,即前驱节点为head头结点的节点(队列的FIFO特点)
                        //此节点尝试请求资源
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //请求成功时,释放资源,这里面有个递归实现,会唤醒所有线程等待共享资源的线程(doReleaseShared()),后面会有图说明
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            //还原线程的中断标志
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //shouldParkAfterFailedAcquire设置Node.waiterStatus == Node.SIGNAL
                        //parkAndCheckInterruptLockSupport.park阻塞线程,当返回时判断是否中断引起的返回
                        interrupted = true;
                }
            } finally {
                if (failed)
                    //清空首尾节点
                    cancelAcquire(node);
            }
        }
    
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; 
            //第二节点设置为head节点,thread == prev == null
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    //释放资源
                    doReleaseShared();
            }
        }
    
      private void doReleaseShared() {
          for (;;) {
              Node h = head;
              if (h != null && h != tail) {
                  //阻塞队列不为空
                  int ws = h.waitStatus;
                  if (ws == Node.SIGNAL) {
                      //头结点head阻塞ws == -1
                      if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                          //修改头结点ws = 0
                          continue;            // loop to recheck cases
                      unparkSuccessor(h);
                  }
                  else if (ws == 0 &&
                           !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                      //头结点为初始状态ws == 0则修改头结点ws = -3
                      continue;                // loop on failed CAS
             }
              if (h == head)                   // loop if head changed
                  break;
          }
      }
    
      private void unparkSuccessor(Node node) {
          int ws = node.waitStatus;
          if (ws < 0)
              //节点阻塞修改ws = 0 初始状态
              compareAndSetWaitStatus(node, ws, 0);
          Node s = node.next;
          if (s == null || s.waitStatus > 0) {
              //后继节点为空或者线程已取消
              s = null;
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)
                      //取头结点后阻塞的节点
                      s = t;
          }
         if (s != null)
              //线程唤醒
              LockSupport.unpark(s.thread);
      }
    
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                //前驱节点是阻塞park,直接返回true
                return true;
            if (ws > 0) {
               //线程已取消
                do {
                    //删除队列中线程已取消的所有前驱节点
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                //前驱节点ws=Node.SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        private final boolean parkAndCheckInterrupt() {
            //park当前线程阻塞
            LockSupport.park(this);
            //返回当前线程中断标志
            return Thread.interrupted();
        }

     

    共享模式AQS阻塞队列的链表结构变化

    4.独占模式与共享模式大部分逻辑相同,但

    1).独占模式节点类型:Node.EXCLUSIVE

    2).少了递归逻辑及仅唤醒头结点的后继节点(仅唤醒一个线程,共享模式是唤醒共享资源上所有线程)

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//Node.EXCLUSIVE类型节点
                selfInterrupt();
        }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);//此处将头结点后继节点设置为头结点,FIFO,释放先入队的节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    三、ConditionObject

    AQS中除了Node外,还包含一个内部类ConditionObject,主要功能信号量机制实现线程同步(与Object中wait notify类似,与操作系统中进程的PV操作保证进程同步)

    ConditionObject定义了一个双向队列——条件队列,链式存储实现,节点还是Node,

    1.变量及构造方法

            /** 条件队列头结点 */
            private transient Node firstWaiter;
            /** 条件队列尾节点 */
            private transient Node lastWaiter;
    
            /**
             * 无参构造方法*/
            public ConditionObject() { }

    2.重要方法

    1)入队方法await():线程阻塞

            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                //创建新的节点插入到条件队列末尾并返回此节点
                Node node = addConditionWaiter();
                //释放当前线程的资源(锁),失败会抛出异常
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                //为条件队列节点(即ws == -2)时,直接返回false,
                //不为条件队列节点(即ws != -2)时,判断是否在AQS队列中
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    //park返回的2种情况:线程被中断;从条件队列转入AQS队列,被unpark唤醒
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        //如果是中断引起的返回直接break;(unpark引起的返回由于,signal时节点从条件队列转入AQS队列中,根据while条件会跳出循环)
                        break;
                }
                //尝试获取资源(锁),返回值是当前线程是否中断标志引起返回 &&  0 != -1;
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                //node无后继节点,从头结点遍历,去掉所有非ws !=2 状态的队列节点
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    //中断引起的返回时
                    //THROW_IE:抛出异常
                    //REINTREEUPT:还原中断状态
                    reportInterruptAfterWait(interruptMode);
            }
    
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    //从头结点遍历,去掉所有不是ws != -2状态的队列节点
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //新建ws = -2t条件队列节点,后尾插法
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                //释放当前线程占有的资源(锁)成功则ws:线程已取消,失败会抛出异常
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

     

    2)出队方法signal():线程唤醒

            public final void signal() {
                //资源(锁)是否被当前线程占用,isHeldExclusively也是个模板方法,条件队列时,必须重写
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    //将条件队列头结点插入到AQS队列中
                    doSignal(first);
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
        final boolean transferForSignal(Node node) {
            /*
             * 修改条件队列节点状态ws = 0
             */ 
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * 插入到AQS队列,此时节点已转化为AQS节点状态:ws == 0,而不是条件队列节点:ws == -2
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            //设置节点为park状态ws == -1,设置不成功直接唤醒当前线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }

    四、总结

    1.一个锁对应一个AQS阻塞队列,对应多个条件队列变量ConditionObject,每个条件变量又有自己的条件队列。

    2.Node节点Data域包含3个变量,Thread thread;int waitStatus;Node nextWaiter;

    thread表明节点对应线程,通常是创建节点时的当前线程;

    waitStatus表明当前线程状态(0:初始;1:线程取消;-1:线程park状态;-2:线程位于条件队列中;-3:共享模式下传递状态);

    nextWaiter表明线程资源模式,用此属性标志共享资源模式(共享锁)还是独占资源模式(独占锁)

    Node节点根据构造方法可分为三类

    Node():AQS阻塞队列的头节点head和共享模式nextWaiter类型Node.SHARED(单例)

    Node(Thread thread, Node mode):AQS阻塞队列中的节点,mode是类型——共享模式Node.SHARED及独占模式Node.EXCLUSIVE,waitStatus != -2(Node.CONDITION)

    Node(Thread thread, int waitStatus):ConditionObject中条件队列中的的节点,此时mode == null(Node.EXCLUSIVE),waitStatus == -2(Node.CONDITION)

    3.LockSupport.park(this)阻塞的线程可被中断唤醒和LockSupport.unpark(this)唤醒;所以AQS中提供两种实现:忽略interrupt()返回并还原中断标志;不忽略interrupt()返回直接抛出异常

    4.可以发现AQS类中,定义了state变量,但是对此变量没有任何操作,原因在于AQS是一个抽象顶级类,采用模板模式的设计方法将一些方法的实现延迟到了子类,而这些方法的实现就包含了对state变量的操作,并发包中锁的原理是通过计数器实现的,state就是这个计数器,state大小代表了锁的状态,通过unsafe类中CAS操作保证对state增减的原子性即保证线程对锁获取释放的原子性。

    state == 0;锁未被占有

    state > 0 ;锁被占用,state数代表可重入锁的重入数

    state < 0 ;锁的重入数超过最大限度(int类型)导致了溢出

    另外重写的方法还可以破坏队列FIFO特性(公平锁),实现非公平锁。具体破坏:例如当锁未被占用时,线程获取锁时,公平锁是插入到阻塞队列,队列头结点获取锁;但具体实现可以不插入阻塞队列,当前线程直接获取锁

        //独占模式(独占锁):尝试获取锁
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
        //独占模式(独占锁):尝试释放锁
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
        //共享模式(共享锁):尝试获取锁
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
        //共享模式(共享锁):尝试释放共享锁
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
        //使用条件队列时必须实现的方法:当前线程是否持有锁
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }    

    五、实现例子

    不可重入的独占锁

    public class NonReentrantLock {
    
        private static class Sync extends AbstractQueuedSynchronizer{
    
            @Override
            protected boolean isHeldExclusively(){
                return getState() == 1;
            }
    
            @Override
            public boolean tryAcquire(int acquires){
                assert acquires == 1;
                if (compareAndSetState(0,1)){
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int releases) {
                assert releases == 1;
                if (getState() == 0){
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            Condition newCondition(){
                return new ConditionObject();
            }
        }
    
        private final Sync sync = new Sync();
    
        public void lock(){
            sync.acquire(1);
        }
    
        public boolean tryLock(){
            return sync.tryAcquire(1);
        }
    
        public void unlock(){
            sync.release(1);
        }
    
        public Condition newCondition(){
            return sync.newCondition();
        }
    
        public boolean isLocked(){
            return sync.isHeldExclusively();
        }
    
        public void lockInterruptibly() throws InterruptedException{
            sync.acquireInterruptibly(1);
        }
    
        public boolean tryLock(long timeout, TimeUnit unit) throws  InterruptedException{
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
    }
    
    
    public class AQSTest {
    
        final static NonReentrantLock lock = new NonReentrantLock();
        final static Condition notFull = lock.newCondition();
        final static Condition notEmpty = lock.newCondition();
    
        final static Queue<String> queue = new LinkedBlockingQueue<>();
        final static int QUEUESIZE = 10;
    
        public static void main(String[] args) throws InterruptedException {
            Thread producer = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (queue.size() == QUEUESIZE){
                            notEmpty.await();
                        }
                        queue.add("ele");
                        notFull.signalAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        lock.unlock();
                    }
                }
            });
    
            Thread consumer = new Thread(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (0 == queue.size()){
                            notFull.await();
                        }
                        String ele = queue.poll();
                        notEmpty.signalAll();
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        lock.unlock();
                    }
                }
            });
    
            producer.start();
            consumer.start();
        }
    }

    参考自《java并发编程之美》

  • 相关阅读:
    C#中的json操作
    Webdriver 怎么操作 scrollbar 下拉框
    jQuery 选择器
    BUYING FEED (第三届省赛)
    AMAZING AUCTION (第三届省赛)
    聪明的“KK” (第三届省赛)
    网络的可靠性 (第三届省赛)
    如何通俗理解——>集群、负载均衡、分布式
    常用的shell命令
    javascript实现原生ajax
  • 原文地址:https://www.cnblogs.com/wqff-biubiu/p/12173900.html
Copyright © 2011-2022 走看看