zoukankan      html  css  js  c++  java
  • java多线程-AbstractQueuedSynchronizer

    大纲:

    1. AbstractQueuedSynchronizer简介
    2. aqs基本结构
    3. aqs应用-ReentrantLock.lock
    4. aqs应用-ReentrantLock.unlock
    5. aqs应用-Semaphore.acquire

     

    一、AbstractQueuedSynchronizer简介

    AbstractQueuedSynchronizer(抽象队列同步器)简介:AbstractQueuedSynchronizer以下简称(aqs)是一个基于先进先出队列,用于构建锁及其他同步装置的基础框架。子类通过继承aqs实现同步的需求。

    二、aqs基本结构

    aqs的数据结构是一个双向链表,aqs的主要成员变量是头尾节点,还有一个state(线程的同步状态)

        //头尾节点
        private volatile Node head;
        private volatile Node tail;
        //同步状态
        private volatile int state;

    节点是一个aqs类中的嵌套类,看下节点的结构:

    static final class Node {
            //节点类型
            static final Node EXCLUSIVE = null;
            static final Node SHARED = new Node();
    
            static final int CANCELLED = 1;
            static final int SIGNAL = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
            //前后节点
            volatile Node prev;
            volatile Node next;
            //节点中存储的线程
            volatile Thread thread;
            Node nextWaite;
            //等待状态CANCELLED/SIGNAL/CONDITION/PROPAGATE
            volatile int waitStatus;
            // Used to establish initial head or SHARED marker
            Node() {}
    
            // Used by Condition
            public Node(Thread thread, int waitStatus) {
                this.thread = thread;
                this.waitStatus = waitStatus;
            }
            // Used by addWaiter
            public Node(Thread thread, Node nextWaite) {
                this.thread = thread;
                this.nextWaite = nextWaite;
            }
        }

    没有获取到资源的线程被包装成为一个节点,每个节点有一个等待状态,节点中存储着节点的前驱与后继节点。

    三、aqs应用-ReentrantLock.lock

    重入锁ReentrantLock的lock方法就是利用了aqs做的同步。

    tip:阅读aqs首先要明白cas是什么https://www.cnblogs.com/liuboyuan/p/10449503.html

    首先ReentrantLock类中有一个抽象的嵌套类Sync:

    abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock(); final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取同步状态 int c = getState(); if (c == 0) { //当前没有线程获得锁 if (compareAndSetState(0, acquires)) { //通过cas将state修改成1表示当前线程获得了锁 //记录持有锁的线程,aqs父类AbstractOwnableSynchronizer的方法 setExclusiveOwnerThread(current); //获取锁成功 return true; } } else if (current == getExclusiveOwnerThread()) { //当前线程已经获取到了锁 //这里就是重入,又一次进入了同一个锁需要同步的代码 // state+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //获取锁失败 return false; } }

    sync的子类-非公平锁的实现(还有公平锁的实现,原理类似本文不做讨论)

    static final class NonfairSync extends Sync {
            //非公平锁的lock方法
            final void lock() {
                //cas修改state状态成功则表示获取锁成功
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    //acquire是aqs的方法
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                //调用Sync的nonfairTryAcquire方法,获取锁成功返回true失败false
                return nonfairTryAcquire(acquires);
            }
        }

    其中acquire是aqs的方法

      //aqs的acquire
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  //Node.EXCLUSIVE是独占模式
                selfInterrupt();
        }

    这个acquire是线程同步的核心方法。

    首先tryAcquire方法是aqs子类各自完成的,ReentrantLock调用的是sycn中的nonfairTryAcquire。

    3.1 addWaiter

    这个方法主要是把没有获取到锁的节点插到队尾

    private Node addWaiter(Node mode) {
            //将当前线程封装成node
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            //队列里原来有值,将node插到队尾
            if (pred != null) {
                node.prev = tail;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果队列空,或者cas插入队尾失败执行enq方法
            enq(node);
            return node;
        }
    
        private Node enq(final Node node) {
            for (; ; ) {
                //获取尾节点
                Node t = tail;
                if (t == null) {//初始化,队列为空
                    //初始化一个空的头结点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                }
                //队列里原来有值,将node插到队尾
                else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    这里for循环就是个自旋,由于是并发插入队尾,乐观锁操作cas就有可能失败,所以不断尝试插入队尾直到成功。

    3.2 acquireQueued

    将节点包装后传入acquireQueued

    这个方法首先判断如果节点的前驱节点为头结点并再获取一次锁,如果成功则将该节点设置为头结点。否则进入阻塞阶段。

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true; //获得锁是否失败
            try {
                boolean interrupted = false;//获取锁过程中被interrupt
                for (;;) {
                    //获取当前节点的前驱节点
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {//当前驱节点是头节点且获取锁成功
                        //将当前节点设为头结点(头结点表示持有锁的节点)
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        //这里线程还没被挂起,无法被interrupt
                        return interrupted;
                    }
                    //找到有效前驱节点,设置前驱节点的waitstatus为Node.SIGNAL,之后park挂起线程,等带着被unpark()或interrupt()
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    3.3 shouldParkAfterFailedAcquire

    shouldParkAfterFailedAcquire将判断前驱节点状态,并返回是否应该阻塞线程

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //当前节点的前驱的waitStatus
            int ws = pred.waitStatus;
            //当前驱的waitStatus为Node.SIGNAL表示该线程可以阻塞返回true
            if (ws == Node.SIGNAL) {
                return true;
            }
            //当前驱的waitStatus为-1表示前驱节点状态为Node.CANCELLED,跳过前驱,直到找到状态不是Node.CANCELLED的节点,将该节点作为当前节点的前驱
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            }
            //将前驱节点的waitStatus改为Node.SIGNAL
            else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

    这里首先判断前驱节点的waitState,如果是Node.SIGNAL表示线程可以被挂起了,返回成功;如果>1则表示取消,将先前继续查找,知道找到一个前驱节点状态<=0(正常状态),将这个节点作为前驱节点替换原来的前驱节点;其他情况将waitState修改为Node.SIGNAL,返回失败。等待下次自旋进入shouldParkAfterFailedAcquire,直到把前驱状态改为Node.SIGNAL。

    当前驱waitState为Node.SIGNAL时,这个线程就可以安心被挂起了。

    3.4 parkAndCheckInterrupt

    线程被挂起,等待其他线程唤醒后,检查是否被打断

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);//调用本地方法将线程挂起
            return Thread.interrupted();//被唤醒时检查是否被打断
        }

     当parkAndCheckInterrupt返回true则调用Thread.currentThread().interrupt()。

    acquire主要流程小结:

    1. tryAcquire:尝试获取锁。成功则返回,失败则有下列流程。
    2. addWaiter:将当前线程包装成一个节点放在队列尾(初始化队列的时候将新建一个空节点在head,再把包装好的节点插在后面)。
    3. acquireQueued:如果当前节点的前驱是头结点则再次尝试获取锁(最后一次挣扎),若成功,则把当前节点设置为头结点,若失败则进入挂起阶段。
    4. shouldParkAfterFailedAcquire将前驱节点的waitState置为Node.SIGNAL。
    5. parkAndCheckInterrupt挂起当前线程,当线程被唤醒检查是否被打断。

    四、aqs应用-ReentrantLock.unlock方法

    unlock方法的核心是aqs中的release方法

        public final boolean release(int arg) {
            if (tryRelease(arg)) {//是否成功释放资源
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);//唤醒队列中的线程
                return true;
            }
            return false;
        }
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }

    4.1 tryRelease方法依然又实现类自己实现,下面是ReentrantLock.Sync的实现

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //c>0则有锁重入情况,c==0将释放锁
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);//清除占有锁的线程
            }
            setState(c);
            return free;
        }

    4.2 unparkSuccessor唤醒节点中的线程

        private void unparkSuccessor(Node node) {
            //node为持有锁的线程,也就是当前线程所在节点 或 空队列时新建出来的空节点
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            //获取下一个节点
            Node s = node.next;
            //空节点和被取消的节点将寻找下一个有效节点
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)//waitStatus为0 和负数都是有效节点
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);//唤醒这个节点的线程
        }

    小结:

    1. release流程比较简单,修改state状态,当state状态为0时,唤醒头结点的下一个有效节点中的线程。
    2. 结合acquireQueued来看,被唤醒的线程回到acquireQueued那个for循环里,进入if (p == head && tryAcquire(arg)) 如果p!=head则也会在shouldParkAfterFailedAcquire中调整为第二个节点,因为之前的节点肯定都已经是取消掉的无效节点,到此为止lock和unlock方法就串起来了。

    五、aqs应用-Semaphore.acquire方法

    Lock的lock方法是调用aqs的acquire方法,该方法是独占模式获取资源的,而Semaphore的acquire调用的是aqs的acquireShared方法,该方法是共享模式获取资源。

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0) //tryAcquireShared方法子类各自实现,Semaphore.Sync中实现如下
                doAcquireShared(arg);
        }

    5.1Semaphore.Sync实现的tryAcquireShared

        protected int tryAcquireShared(int acquires //需要获取的资源数) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState(); //剩余的资源数
                int remaining = available - acquires; //remaining获取资源后剩余的资源数
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
        }
        

    5.2aqs中doAcquireShared

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED); //包装节点,这里是共享模式
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();//前驱
                    if (p == head) {
                        int r = tryAcquireShared(arg);//获取资源
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);//将当前节点设置为头结点,唤醒下一个有效节点
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    和独占模式的流程如出一辙,入队、挂起 的流程一样,依然是唤醒头结点的下一个有效节点。这里需要注意即使是共享模式,唤醒依然是按照入队顺序来的,但资源数不够第二个节点的获取数时也不会唤醒后续节点(即使资源数满足后续节点的获取数)。

    唯一不同的是在资源还有剩余的情况下,会在设置头节点的同时继续唤醒当前节点下一个有效节点

       private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            //propagate > 0(如果剩余的资源数>0的话,尝试唤醒下一个有效节点)
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                    (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }

    小结:

    1. 两种模式获取资源方式差不多,共享模式多了如果资源数>0继续唤醒节点的操作。
    2. 两种模式释放资源的方式也基本一样,这里不再赘述。
  • 相关阅读:
    python 中 print函数的用法详解
    可转债操作一览
    Python基本数据类型
    python的列表
    理财的方法
    92、Multiple commands produce Info.plist 报错
    91、最新cocoaPods安装与使用
    90、引入头文件不提示
    89、instancetype和id的区别
    88、const、static、extern介绍
  • 原文地址:https://www.cnblogs.com/liuboyuan/p/10778267.html
Copyright © 2011-2022 走看看