zoukankan      html  css  js  c++  java
  • 【Java并发编程】4. 一文搞定JUC半壁江山 AQS


    在这里插入图片描述

    AQS

    AQS了解

    在JDK中我们一般用AQS来构建跟实现显示锁,语言层面我们一般用关键字Syn来实现。AQS(AbstractQueuedSynchronizer 抽象队列同步器)在上一文中被广泛使用。比如CountDownLatch,ThreadPoolExecutor,ReentrantLock,读写锁等,几乎占据了JUC并发包里的半壁江山,FutureTask在JDK 7前用的AQS,现在也是用的AQS思想。
    在这里插入图片描述
    我们以CountDownLatch为例,在CountDownLatch中有有一个静态的finalSync来实现AbstractQueuedSynchronizer中的若干独占式方法。这种方式说白了就是将更具体的细节实现进行了封装,调用者只关注更粗旷的几个对外接口即可。

    public class CountDownLatch {
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer { //此处是关键
            private static final long serialVersionUID = 4982264981922014374L;
    ...
    }
    

    AQS中模版模式

    AbstractQueuedSynchronizer是个抽象类,所有用到方法的类都要继承此类的若干方法,涉及到的设计模式就是模版模式

    板方法模式是类的行为模式。准备一个抽象类,将部分逻辑以具体方法以及具体构造函数的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑。不同的子类可以以不同的方式实现这些抽象方法,从而对剩余的逻辑有不同的实现。关键在于夫类有个框架方法

    抽象类

    public abstract class SendCustom {
    	public abstract void to();
    	public abstract void from();
    	public void date() {
    		System.out.println(new Date());
    	}
    	public abstract void send();
    	
    	// 注意此处 框架方法-模板方法
    	public void sendMessage() {
    		to();
    		from();
    		date();
    		send();
    	}
    }
    

    模版方法派生类

    public class SendSms extends SendCustom {
    
    	@Override
    	public void to() {
    		System.out.println("sowhat");
    	}
    
    	@Override
    	public void from() {
    		System.out.println("jack");
    	}
    
    	@Override
    	public void send() {
    		System.out.println("Send message");
    	}
    	
    	public static void main(String[] args) {
    		SendCustom sendC = new SendSms();
    		sendC.sendMessage();
    	}
    }
    

    AQS重要方法

    模板方法

    模版方法分为独占式跟共享式,根据我们子类需要不同调用不同的模版方法。

    独占式获取
    1. accquire
      不可中断获取锁accquire是获取独占锁方法,acquire尝试获取资源,成功则直接返回,不成功则进入等待队列,这个过程不会被线程中断,被外部中断也不响应,获取资源后才再进行自我中断selfInterrupt()
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    

    这里面有三个方法

    • Acquire(arg)

    tryAcquire(arg) 顾名思义,它就是尝试获取锁,需要我们自己实现具体细节

    • addWaiter(Node.EXCLUSIVE)

    主要功能是 一旦尝试获取锁未成功,就要使用该方法将其加入同步队列尾部,由于可能有多个线程并发加入队尾产生竞争,因此,采用compareAndSetTail锁方法来保证同步

    • quireQueued(addWaiter(Node.EXCLUSIVE), arg)

    一旦加入同步队列,就需要使用该方法,自旋阻塞唤醒来不断的尝试获取锁,直到被中断或获取到锁。

    1. acquireInterruptibly
      可中断获取锁acquireInterruptibly相比于acquire支持响应中断。在acquire中,如果park操作被中断,那么只是记录了interrupted状态,然后继续进入循环判断是否可以acquire或者阻塞。而在acquireInterruptibly中,一旦被中断,那么就立即抛出InterruptedException异常。其他方面两个方法并没有显著不同。
      在这里插入图片描述
      在这里插入图片描述
    2. tryAcquireNanos
      该方法的调用可以被中断,增加了超时则失败的功能。可以说该方法的实现与上述两方法没有任何区别。时间功能上就是用的标准超时功能,如果剩余时间小于0那么acquire失败,如果该时间大于一次自旋锁时间(1000L),并且可以被阻塞,那么调用LockSupport.parkNanos方法阻塞线程。
      在这里插入图片描述
      该方法一般会有以下几种情况产生:
    1. 在指定时间内,线程获取到锁,返回true。
    2. 当前线程在超时时间内被中断,抛中断异常后,线程退出。
    3. 到截止时间后线程仍未获取到锁,此时线程获得锁失败,不再等待直接返回false。
    共享式获取

    该模版方法的工作:

    1. 调用tryAcquireShared(arg) 尝试获得资源。
    2. 未获得资源则函数返回值是负数,需要通过doAcquireShared(arg)进入等待队列,等待获取资源
    3. 获得资源(说明此时有剩余资源)则返回正数,因为是共享说明别的线程也可以来获得。
    1. acquireShared
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    1. acquireSharedInterruptibly
      无非就是可中断性的共享方法
    public final void acquireSharedInterruptibly(long arg)  throws InterruptedException {
        if (Thread.interrupted())       // 如果线程被中断,则抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)  // 如果tryAcquireShared()方法获取失败,则调用如下的方法
            doAcquireSharedInterruptibly(arg);
    }
    
    1. tryAcquireSharedNanos
      动作如下:尝试以共享模式获取,如果被中断则中止,如果超过给定超时期则失败。实现此方法首先要检查中断状态,然后至少调用一次 tryacquireshared(long),并在成功时返回。否则,在成功、线程中断或超过超时期之前,线程将加入队列,可能反复处于阻塞或未阻塞状态,并一直调用 tryacquireshared(long)
        public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
        }
    
    独占式释放

    release独占锁的释放调用unlock方法,而该方法实际调用了AQS的release方法
    这段代码逻辑比较简单,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。

        public final boolean release(long arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    共享式释放

    releaseShared首先去尝试释放资源tryReleaseShared(arg),如果释放成功了,就代表有资源空闲出来,那么就用doReleaseShared()去唤醒后续结点。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    比如CountDownLatch的countDown()具体实现:

        public void countDown() {
            sync.releaseShared(1);
        }
    

    子类需实现方法

    子类要实现的父类方法也分为独占式跟共享式

    独占式获取
    1. tryAcquire
      顾名思义,就是尝试获取锁,AQS在这里没有对其进行功能的实现,只有一个抛出异常的语句,我们需要自己对其进行实现,可以对其重写实现公平锁、不公平锁、可重入锁、不可重入锁
    protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
    }
    
    独占式释放

    tryRelease 尝试释放 独占锁,需要子类实现。

      protected boolean tryRelease(long arg) {
           throw new UnsupportedOperationException();
       }
    
    共享式获取

    tryAcquireShared 尝试进行共享锁的获得,需要子类实现。

    protected long tryAcquireShared(long arg) {
            throw new UnsupportedOperationException();
        }
    
    共享式释放

    tryReleaseShared尝试进行共享锁的释放,需要子类实现。

        protected boolean tryReleaseShared(long arg) {
            throw new UnsupportedOperationException();
        }
    
    查询是否处于独占模式

    isHeldExclusively 该函数的功能是查询当前的工作模式是否是独占模式。需要子类实现。

        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    

    状态标志位

    state因为用volatile因此保证了我们操作的可见性,所以任何线程通过getState()获得状态都是可以得到最新值,但是setState()无法保证原子性,因此AQS给我们提供了compareAndSetState方法利用底层UnSafe的CAS功能来实现原子性。

        /**
         * The synchronization state.
         */
        private volatile long state;
        -------------
        protected final long getState() {
            return state;
        }
        -------------
        protected final void setState(long newState) {
            state = newState;
        }
       -------------
       protected final boolean compareAndSetState(long expect, long update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
        }
    

    自我实现独占锁

    ReentrantLock是继承自Lock接口实现的独占式可重入锁。因为查阅源码可以知道ReentrantLock是跟AQS进行组合来实现的,Lock主要接口如下:
    在这里插入图片描述
    MyLock:

    public class SelfLock implements Lock {
    
        // state 表示获取到锁 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到
        private static class Sync extends AbstractQueuedSynchronizer {
            //是否占用
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0, 1)) { 
                // 拿之前我希望没线程拿到,我拿到设置为1
                // 目前的实现的独占锁是无法实现可重入的哦,
                // 因为第二次拿时候 原始值以及是1了
                   setExclusiveOwnerThread(Thread.currentThread()); 
                   //独占模式 告诉系统那个线程目前拿到线程锁了
                    return true;
                }
                return false;
            }
            protected boolean tryRelease(int arg) {
                if (getState() == 0) {
                    throw new UnsupportedOperationException();
                }
                setExclusiveOwnerThread(null); 
                //当前锁无线程 占用。
                setState(0);
                //此时我们是独占锁,没有线程抢着来释放,因此用此方法即可
                return true;
            }
            Condition newCondition() {
                return new ConditionObject();
            }
        }
        private final Sync sycn = new Sync();
        @Override
        public void lock() {
            sycn.acquire(1);
        }
        @Override
        public boolean tryLock() {
            return sycn.tryAcquire(1);
        }
        @Override
        public void unlock() {
            sycn.release(1);
        }
    	@Override
    	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    		return sycn.tryAcquireNanos(1, unit.toNanos(time));
    	}
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sycn.acquireInterruptibly(1);
        }
        @Override
        public Condition newCondition() {
            return sycn.newCondition();
        }
    }
    

    AQS底层

    AQS内部维护着一个FIFO的队列,即CLH队列,该队列一个自旋锁。能确保无饥饿性。提供先来先服务的公平性。AQS的同步机制就是依靠CLH队列实现的。CLH队列是FIFO的双端双向链表队列(之所以双向是方便尾部节点插入,可直接获得目前尾节点),实现公平锁。线程通过AQS获取锁失败,就会将线程封装成一个Node节点,通过CAS原子操作插入队列尾。当有线程释放锁时,会尝试让队头的next节点占用锁,在CLH中头节点是获得锁的切记。

    1. CLH队列由Node对象组成,其中Node是AQS中的内部类。
    2. static final Node SHARED = new Node(); 标识共享锁
    3. static final Node EXCLUSIVE = null;标识独占锁

    同时Node节点中有若干final变量值来表示当前节点的状态。

    1. CANCELLED = 1

    由于该节点线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1。节点进入了取消状态,该类型节点不会参与竞争,且会一直保持取消状态。

    1. SIGNAL = -1

    后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1),将会通知后继节点,使后继节点的线程得以运行。

    1. CONDITION = -2

    节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中。

    1. PROPAGATE = -3

    表示下一次的共享状态会被无条件的传播下去,因为共享锁可能出现同时有N个锁可以用,这时候直接让后面的N个节点都来工作。

    1. INITIAL = 0

    初始状态.

    • 在Node节点中一般通过waitStatus获得以上5种状态的一种。
    • Node prev:前驱节点
    • Node next:后继节点
    • Thread thread:获取锁失败的线程保存在Node节点中。
    • Node nextWaiter:当我们调用了Condition后他也有一个等待队列

    在这里插入图片描述

    独占式加入同步队列

    同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),如果加入的节点是OK的则会直接运行该节点,当若干个线程抢锁失败了那么就会抢着加入到同步队列的尾部,因为是抢着加入这个时候用CAS来设置尾部节点。
    在这里插入图片描述
    入口代码:
    在这里插入图片描述

    1. tryAcquire

    该方法是需要自我实现的,在上面的demo中可见一斑,就是返回是否获得了锁。

    1. addWaiter(Node.EXCLUSIVE,arg)
       /**
        * 如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。
        */
        private Node addWaiter(Node mode) {
    // 用当前线程构造一个Node对象,mode是一个表示Node类型的字段,或者说是这个节点是独占的还是共享的
            Node node = new Node(Thread.currentThread(), mode);
            // 将目前队列中尾部节点给pred
            Node pred = tail;
            // 队列不为空的时候
            if (pred != null) {
                node.prev = pred;
       // 先尝试通过AQS方式修改尾节点为最新的节点,如果修改失败,意味着有并发,
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //第一次尝试添加尾部失败说明有并发,此时进入自旋
            enq(node);
            return node;
        }
    
    1. 自旋enq
      enq方法将并发添加节点的请求通过CAS跟自旋将尾节点的添加变得串行化起来。说白了就是让节点放到正确的队尾位置。
    /**
        * 这里进行了循环,如果此时存在了tail就执行同上一步骤的添加队尾操作,如果依然不存在,
         * 就把当前线程作为head结点。插入节点后,调用acquireQueued()进行阻塞
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    1. acquireQueued
      acquireQueued是当前Node节点线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取锁,原因是:

    1.头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。
    2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点都在不断的执行for死循环。

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                // 自旋检查当前节点的前驱节点是否为头结点,才能获取锁
                for (;;) {
                    // 获取节点的前驱节点
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                    // 节点中的线程循环的检查,自己的前驱节点是否为头节点
                    // 只有当前节点 前驱节点是头节点才会 再次调用我们实现的方法tryAcquire
                        // 接下来无非就是将当前节点设置为头结点,移除之前的头节点
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 否则检查前一个节点的状态,看当前获取锁失败的线程是否要挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
           //如果需要挂起,借助JUC包下面的LockSupport类的静态方法park挂起当前线程,直到被唤醒
                        parkAndCheckInterrupt())
                        interrupted = true; // 两个判断都是true说明 则置true
                }
            } finally {
                //如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
                if (failed)
                   //取消请求,将当前节点从队列中移除
                    cancelAcquire(node);
            }
        }
    

    如果成功就返回,否则就执行shouldParkAfterFailedAcquireparkAndCheckInterrupt来达到阻塞效果。
    5. shouldParkAfterFailedAcquire
    第二步的addWaiter()构造的新节点,waitStatus的默认值是0。此时,会进入最后一个if判断,CAS设置pred.waitStatus SIGNAL,最后返回false。由于返回false,第四步的acquireQueued会继续进行循环。假设node的前继节点pred仍然不是头结点或锁获取失败,则会再次进入shouldParkAfterFailedAcquire()。上一轮循环中已经将pred.waitStatu = -1了,则这次会进入第一个判断条件,直接返回true,表示应该阻塞调用parkAndCheckInterrupt

    那么什么时候会遇到ws > 0呢?当pred所维护的获取请求被取消时(也就是node的waitStatus 值为CANCELLED),这时就会循环移除所有被取消的前继节点pred,直到找到未被取消的pred。移除所有被取消的前继节点后,直接返回false。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus; // 获得前驱节点的状态
            if (ws == Node.SIGNAL) //此处是第二次设置
                return true;
            if (ws > 0) {
               do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
               //  此处是第一次设置
               compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// unsafe级别调用设置
            }
            return false;
        }
    
    1. parkAndCheckInterrupt
      主要任务是暂停当前线程然后查看是否已经暂停了。
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this); // 调用park()使线程进入waiting状态
            return Thread.interrupted();// 如果被唤醒,查看自己是不是已经被中断了。
        }
    
    1. cancelAcquire
      acquireQueued方法的finally会判断 failed值,默认为是被中断或者timeout了,正常运行时候自旋出来的时候会是false,如果中断或者timeout了 则会是true,执行cancelAcquire,其中核心代码是node.waitStatus = Node.CANCELLED
    2. selfInterrupt
      static void selfInterrupt() {
            Thread.currentThread().interrupt();
        }
    

    除头节点所有节点都在自选获取同步状态:
    在这里插入图片描述
    加入的整体流程如下:
    在这里插入图片描述

    独占式释放队列头节点

    release()会调用tryRelease方法尝试释放当前线程持有的锁,成功的话唤醒后继线程,并返回true,否则直接返回false。

        public final boolean release(long arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    1. tryRelease
      这个是子类需要自我实现的,没啥说的根据业务需要实现。
    2. unparkSuccessor
      唤醒头结点的后继节点。
        private void unparkSuccessor(Node node) {
           int ws = node.waitStatus; // 获得头节点状态
            if (ws < 0) //如果头节点装小于0 则将其置为0
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next; //这个是新的头节点
            if (s == null || s.waitStatus > 0) { 
            // 如果新头节点不满足要求
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                //从队列尾部开始往前去找最前面的一个waitStatus小于0的节点
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)//唤醒后继节点对应的线程
                LockSupport.unpark(s.thread);
        }
    

    共享锁的添加

    Semaphore 这就是共享锁的一个实现类,在初始化的时候就规定了共享锁池的大小N,有一个线程获得了锁,可用数就减少1个。有一个线程释放锁可用数就增加1个。如果有>=2的线程同时释放锁,则此时有多个锁可用。这个时候就可以同时唤醒两个锁setHeadAndPropagate

    在这里插入图片描述
    在这里插入图片描述

    共享锁demo

    借鉴Semaphore 的实现,可以实现自己的共享锁池。

    public class TrinityLock {
    
        // 为3表示允许3个线程同时获得锁
        private final Sync sync = new Sync(3);
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            Sync(int count) {
                if (count <= 0) {
                    throw new IllegalArgumentException("count must large than zero.");
                }
                setState(count); //初始化锁的个数
            }
    
            public int tryAcquireShared(int reduceCount) {
                for (; ; ) {
                    int current = getState();
                    int newCount = current - reduceCount;
                    if (newCount < 0 || compareAndSetState(current, newCount)) {
                        // 如果出现了 newCount 为负数 外面的判断会进行自旋等待
                        return newCount;
                    }
                }
            }
    
            public boolean tryReleaseShared(int returnCount) {
                for (; ; ) {
                    int current = getState();
                    int newCount = current + returnCount;
                    if (compareAndSetState(current, newCount)) {
                        return true;
                    }
                }
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
        }
    
        public void lock() {
            sync.acquireShared(1);
        }
    
        public void unlock() {
            sync.releaseShared(1);
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
    
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
        }
    
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    

    Condition讲解

    Condition是JDK5后引入的Interface,它用来替代传统的Object的wait()/notify()实现线程间的协作,相比使用Object的wait()/notify(),使用Conditionawait()/signal()这种方式实现线程间协作更加安全和高效。简单说,他的作用是使得某些线程一起等待某个条件(Condition),只有当该条件具备(signal 或者 signalAll方法被调用)时,这些等待线程才会被唤醒,从而重新争夺锁。wait()/notify()这些都更倾向于底层的实现开发,而Condition接口更倾向于代码实现的等待通知效果。两者之间的区别与共通点如下:
    在这里插入图片描述
    敲重点:一个Condition对象就有一个单项的等待任务队列。
    在这里插入图片描述
    在一个多线程任务中我们可以new出多个等待任务队列。比如我们new出来两个等待队列。

        private Lock lock = new ReentrantLock();
        private Condition FirstCond = lock.newCondition();
        private Condition SecondCond = lock.newCondition();
    

    所以真正的AQS任务中一般是一个任务队列N个等待队列的。
    在这里插入图片描述
    然后当我们调用await方法时候。说明当前任务队列的头节点拿着锁呢,此时要把该Thread从任务对列挪到等待队列里,如图:
    在这里插入图片描述
    当我们调用signal方法的时候,我们要将等待队列中的头节点移出来,让其去抢锁,不过如果是抢的公平对列就是排队了,流程如图:
    在这里插入图片描述
    Condition已经实现了多个实例化的等待队列,因此我们尽量调用signal而少用signalAll,因为在指定的实例化等待队列中只有一个可以拿到锁的。
    waitnotify底层代码的等待队列只有一个,多个线程调用wait的时候我们是无法知道头节点是那个具体线程的。因此只能notifyAll

    锁可重入

    ReentrantLock的锁是可重入的不管是公平锁还是非公平锁,我们以默认的非公平锁为例看下获取的状态。可以发现当进行可重入的时候锁的数据是不断增加的,同时当进行释放的时候锁的个数是不断减少的。
    在这里插入图片描述
    在这里插入图片描述

    公平跟非公平

    首先看下非公平锁ReentrantLock.NonfairSync这里的核心思想就是当前进程尝试获取锁的时候,如果发现锁的状态位是0的话就直接尝试将锁拿过来,然后setExclusiveOwnerThread,根本不管其他原因。
    但是 因为头节点是动态变化的因此头节点的状态也是变化的,最终导致state也是动态变化的,如果头节点刚刚跑完设置state=0,下一步就是要进行交接仪式了,非公平锁来了直接霸占尝试获得锁。

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 先直接获得线程对象
        int c = getState();
        if (c == 0) {
            // 如果出现了任务队列 首节点线程完工了,将state设置为0,
            // 下一步就进行交接仪式了。这个时候有个牛逼线程来了
            // 发现state是空的,那就直接拿来加锁使用。根本不考虑后面继承者的存在。
            if (compareAndSetState(0, acquires)) {
                // 1、利用CAS自旋方式,判断当前state确实为0,然后设置成acquire(1)
                // 这是原子性的操作,可以保证线程安全
                setExclusiveOwnerThread(current);
                // 设置当前执行的线程,直接返回为true
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
           // 2、当前的线程和执行中的线程是同一个,也就意味着可重入操作
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            // 表示当前锁被1个线程重复获取了nextc次
            return true;
        }
        // 否则就是返回false,表示没有尝试成功获取当前锁
        return false;
    }
    

    看下ReentrantLock.FairSync尝试获得公平锁。当前进程来来后先看一看前面是否有前驱节点。
    在这里插入图片描述

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    1. 如果h==t成立的时候说明要不是同一个节点,要不都是null。此时 返回false。因为当前state= 0 说名如果是同一个节点也是执行完毕的。所以会执行compareAndSetState进行设定。
    1. 如果h!=t成立,head.next是否为null,如果为null,返回true。什么情况下h!=t的同时h.next==null 呢,有其他线程第一次正在入队时,可能会出现。见AQS的enq方法,compareAndSetHead(node)完成,还没执行tail=head语句时,此时t=null,head=new Node(),head.next=null。

    在这里插入图片描述

    1. 如果h!=t成立,head.next != null,则判断head.next是否是当前线程,如果返回false,否则返回true。
    2. head节点是获取到锁的节点,但是任意时刻head节点可能占用着锁,也可能释放了锁,如果释放了锁则此时state=0了,未被阻塞的head.next节点对应的线程在任意时刻都是在自旋的在尝试获取锁。

    ReentrantReadWriteLock

    在这里插入图片描述
    ReentrantReadWriteLock类中也是只有一个32位的int state来表示读锁跟写锁,如何实现的?

    1. 后16位用过来保存独享的写锁个数,第一次获得就是01,第二次重入就是10了,这样的方式来保存。
    2. 但是多个线程都可以获得读锁,并且每个线程可能读多次,如何保存?我们用前16位来保存有多少个线程获得了读锁。
    3. 每个读锁线程获得的重入读锁个数 由内部类HoldCounter与读锁配套使用。
            static final class ThreadLocalHoldCounter
               extends ThreadLocal<HoldCounter> {
               public HoldCounter initialValue() {
                   return new HoldCounter();
               }
           }
          private transient ThreadLocalHoldCounter readHolds;
    

    ending:多跟几次源码,尝试将源码消化,写下大致流程图就好理解。
    在这里插入图片描述

    参考

    AQS流程简书
    AQS图

  • 相关阅读:
    跟我学Angular2(1-初体验)
    JavaScript之糟粕
    JavaScript之毒瘤
    CSS布局(下)
    CSS布局(上)
    ES6入门系列三(特性总览下)
    ES6入门系列四(测试题分析)
    setTimout执行时间
    进程和线程关系及区别
    css3新单位学习
  • 原文地址:https://www.cnblogs.com/sowhat1412/p/12734081.html
Copyright © 2011-2022 走看看