zoukankan      html  css  js  c++  java
  • Java并发编程系列-(4) 显式锁与AQS

    目前已经更新完《Java并发编程》,《Docker教程》和《JVM性能优化》,欢迎关注【后端精进之路】,轻松阅读全部文章。

    Java并发编程:

    Docker教程:

    JVM性能优化:

    4 显示锁和AQS

    4.1 Lock接口

    核心方法

    Java在java.util.concurrent.locks包中提供了一系列的显示锁类,其中最基础的就是Lock接口,该接口提供了几个常见的锁相关的操作。

    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
    

    下面分别进行介绍:

    • void lock();

    获取锁。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态。

    • void lockInterruptibly();

    如果当前线程未被中断,则获取锁。如果锁可用,则获取锁,并立即返回。与lock()接口唯一的区别是可以被中断。

    • boolean tryLock();

    试图获取锁,若锁可用,则获取锁,并立即返回值true。若锁不可用,则此方法将立即返回值false。

    • boolean tryLock(long time, TimeUnit unit) throws

    与上个方法不同的就是给定了超时时间,若锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。

    • Condition newCondition();

    返回绑定到此 Lock 实例的新 Condition 实例。

    使用模板

    通常使用显示锁Lock时,会采用下面的操作流程:

    lock.lock();
    try {
        //...需要保证线程安全的代码。
    } finally {
        lock.unlock();
    }
    

    Lock的lock()方法保证了只有一个线程能够执有此锁。对于任何一个lock()方法,都需要一个unlock()方法与之对应,通常情况下为了保证unlock()方法总是能够执行,unlock()方法被置于finally中。

    Lock VS synchronized

    Synchronized是Java的关键字,当它用来修饰一个方法或一个代码块时,能够保证在同一时刻最多只有一个线程执行该代码。因为当调用Synchronized修饰的代码时,并不需要显示的加锁和解锁的过程,代码简洁,一般称之为隐式锁。

    Lock是一个接口,提供了无条件的、可轮询的、定时的、可中断的锁获取操作,所有的加锁和解锁操作方法都是显示的,因而称为显示锁。

    4.2 ReentrantLock

    可重入锁ReentrantLock是对Lock接口的一种实现,支持当一个线程获取锁以后,可以再次得到该对象锁。

    ReentrantLock在初始化时,需要设定该锁的公平性:

    • 如果在时间上,先对锁进行获取的请求,一定先被满足,这个锁就是公平的,不满足,就是非公平的
    • 非公平的效率一般来讲更高

    ReentrantLock的特性如下:

    1. 可重入

    synchronized和ReentrantLock均有可重入性,即一个线程请求得到一个对象锁后再次请求此对象锁,可以再次得到该对象锁。

    在使用synchronized时,当一个线程已经进入到synchronized方法/块中时,可以进入到本类的其他synchronized方法/块中。

    2. 可中断

    在lockInterruptibly()锁定的同时,还可以响应中断通知。一旦接收到中断通知,就会抛出InterruptedException异常。

    这点与synchronized不同,在synchronized加锁的代码中,无法获取中断通知。

    3. 可设置超时

    ReentrantLock.tryLock()方法用于尝试锁定。参数为等待时间。该方法返回boolean值。若锁定成功,则返回true。锁定失败,则返回false。tryLock方法在超时不能获得锁时,就返回false,不会永久等待构成死锁。

    4. 公平锁

    ReentrantLock内部利用AQS的线程队列,可以实现公平锁,但是性能相比非公平锁会差一点。

    在构造方法中,ReentrantLock(boolean fair),fair默认为false,当设置为true时,及表示当前构造的锁是公平锁。

    当需要可定时的、可轮询的与可中断的锁获取操作,公平队列,或者非块结构的锁,建议使用ReentrantLock。否则,请使用synchronized。在Java 1.6之后,ReentrantLock和synchronized性能相差不大,所以一般情况下,使用synchronized就足够了,只有当有特定需求时,可以使用可重入锁。

    4.3 Lock与Condition实现消息传递

    利用Lock和Condition可以实现消息的等待和通知,这里我们利用ReentrantLock来进行举例。

    注意在使用condition时,需要首先lock.newCondition来获取Condition对象,如果有多个条件,需要针对不同的条件来获取condition。

    发送信号,调用condition.signal()方法;等待,调用condition.await()方法。

    注意与notify与wait的区别,后者Object的方法,一般用在一个对象上进行等待,等待的线程和某个特定的对象绑定。当需要notify所有线程时,为了保证我们的消息被所有线程接收到,通常使用notifyAll发送消息。但是使用condition对象,await和signal操作都是在condition对象是进行的,所以使用signal通知时,不会存在等待其他消息的线程阻止消息传递,所以通常使用signal而不是signalAll。

    public class ExpressCond {
        public final static String CITY = "ShangHai";
        private int km;/*快递运输里程数*/
        private String site;/*快递到达地点*/
        private Lock lock = new ReentrantLock();
        private Condition keCond = lock.newCondition();
        private Condition siteCond = lock.newCondition();
    
        public ExpressCond() {
        }
    
        public ExpressCond(int km, String site) {
            this.km = km;
            this.site = site;
        }
    
        /* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/
        public void changeKm(){
            lock.lock();
            try {
            	this.km = 101;
            	keCond.signal();
            }finally {
            	lock.unlock();
            }
        }
    
        /* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/
        public  void changeSite(){
        	lock.lock();
            try {
            	this.site = "BeiJing";
            	siteCond.signal();
            }finally {
            	lock.unlock();
            }    	
        }
    
        /*当快递的里程数大于100时更新数据库*/
        public void waitKm(){
        	lock.lock();
        	try {
            	while(this.km<=100) {
            		try {
            			keCond.await();
        				System.out.println("check km thread["+Thread.currentThread().getId()
        						+"] is be notifed.");
        			} catch (InterruptedException e) {
        				// TODO Auto-generated catch block
        				e.printStackTrace();
        			}
            	}    		
        	}finally {
        		lock.unlock();
        	}
    
            System.out.println("the Km is "+this.km+",I will change db");
        }
    
        /*当快递到达目的地时通知用户*/
        public void waitSite(){
        	lock.lock();
            try {
            	while(CITY.equals(this.site)) {
            		try {
            			siteCond.await();
        				System.out.println("check site thread["+Thread.currentThread().getId()
        						+"] is be notifed.");
        			} catch (InterruptedException e) {
        				// TODO Auto-generated catch block
        				e.printStackTrace();
        			}
            	}
            }finally {
            	lock.unlock();
            } 
            System.out.println("the site is "+this.site+",I will call user");
        }
    }
    

    下面是测试函数,将会唤醒一个等待km变化的线程。

    public class TestCond {
        private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY);
    
        /*检查里程数变化的线程,不满足条件,线程一直等待*/
        private static class CheckKm extends Thread{
            @Override
            public void run() {
            	express.waitKm();
            }
        }
    
        /*检查地点变化的线程,不满足条件,线程一直等待*/
        private static class CheckSite extends Thread{
            @Override
            public void run() {
            	express.waitSite();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            for(int i=0;i<3;i++){
                new CheckSite().start();
            }
            for(int i=0;i<3;i++){
                new CheckKm().start();
            }
    
            Thread.sleep(1000);
            express.changeKm();//快递里程变化
        }
    }
    

    4.4 ReadWriteLock 和 ReentrantReadWriteLock

    ReadWriteLock接口提供了单独的读锁和写锁,

    public interface ReadWriteLock {
        Lock readLock();
        Lock writeLock();
    }
    

    ReentrantReadWriteLock类是ReadWriteLock接口的一个实现,它与ReentrantLock类一样提供了公平竞争与不公平竞争两种机制,默认也是使用非公平竞争机制。

    ReentrantReadWriteLock的可以被多个读者访问和一个写者访问,提供了读写分离功能:

    • 读-读不互斥:读读之间不阻塞。
    • 读-写互斥:读阻塞写,写也会阻塞读。
    • 写-写互斥:写写阻塞。

    ReentrantReadWriteLock在读多写少的场景下,具有很强的性能优势。

    WriteLock VS ReadLock

    1.重入方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock无法获得WriteLock。

    2.WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能。

    4.不管是ReadLock还是WriteLock都支持Interrupt,语义与ReentrantLock一致。

    5.WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。

    ReentrantLock VS ReentrantReadWriteLock

    1. ReentrantLock是排他锁,使用非公平竞争机制时,抢占的机会相对还是比较少的,只有当新请求恰逢锁释放时才有机会抢占,所以发生线程饥饿的现象几乎很少。

    2. ReentrantReadWriteLock是共享锁,或者说读读共享,并且经常使用于读多写少的场景,即请求读操作的线程多而频繁而请求写操作的线程极少且间隔长,在这种场景下,使用非公平竞争机制极有可能造成写线程饥饿。比如,R1线程此时持有读锁且在进行读取操作,W1线程请求写锁所以需要排队等候,在R1释放锁之前,如果R2,R3,...,Rn 不断的到来请求读锁,因为读读共享,所以他们不用等待马上可以获得锁,如此下去W1永远无法获得写锁,一直处于饥饿状态。


    参考链接:

    4.5 LockSupport

    LockSupport是一个方便的线程阻塞工具,它可以在线程的任何位置让线程阻塞。与Thread.suspend()方法相比,它弥补了由于resume()方法导致线程无法继续执行的情况。和Object.wait()方法相比,它不需要先获得某个对象的锁,也不会抛出InterruptedException异常。

    LockSupport主要有两个方法,

    • LockSupport.park()

    park()方法会阻塞当前线程(线程进入Waiting状态),除非它获取了"许可证"。

    • LockSupport.unpark(Thread t)

    unpark(Thread t)方法会给线程t颁发一个"许可证"。

    LockSupport使用了类似信号量的机制,它为每一个线程准备了一个许可,如果许可可用,park()方法会立刻返回,并且消费这个许可(也就是将许可变为不可用);如果许可不可用,就会阻塞,而unpack()方法则使得一个许可变为可用(但是和信号量不同的是,许可不可累加,永远只能拥有不超过一个许可)。

    4.6 AQS

    AQS:AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),是JUC并发包中的核心基础组件。JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

    AQS解决了实现同步器时涉及到的大量细节问题,例如获取同步状态、FIFO同步队列。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。

    AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

    AQS使用了模板方法设计模式,子类通过继承同步器并实现它的抽象方法来管理同步状态。

    AQS模板方法

    AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了如下三个方法来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

    • getState():返回同步状态的当前值;

    • setState(int newState):设置当前同步状态;

    • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

    独占式获取:

    tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态

    tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;

    tryRelease(int arg):独占式释放同步状态;

    acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

    acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

    isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;

    共享式获取:

    tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;

    tryReleaseShared(int arg):共享式释放同步状态;

    acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

    acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;

    tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

    独占式释放锁:

    release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

    共享式释放锁:

    releaseShared(int arg):共享式释放同步状态;

    当在实现自己的lock类时,需要子类覆盖如下方法,
    独占式获取 tryAcquire
    独占式释放 tryRelease
    共享式获取 tryAcquireShared
    共享式释放 tryReleaseShared
    这个同步器是否处于独占模式 isHeldExclusively

    CLH同步队列

    CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程以及等待状态等信息打包成一个节点(Node),并将其加入到CLH同步队列,同时会阻塞当前线程。当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

    Picture1.png

    在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),

    • CANCELLED,值为1 。场景:当该线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1,即被取消(这里该线程在取消之前是等待状态)。节点进入了取消状态则不再变化;
    • SIGNAL,值为-1。场景:后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1),将会通知后继节点,使后继节点的线程得以运行;
    • CONDITION,值为-2。场景:节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中;
    • PROPAGATE,值为-3。场景:表示下一次的共享状态会被无条件的传播下去;
    • INITIAL,值为0,初始状态。

    其定义如下:

    static final class Node {
        /** 共享 */
        static final Node SHARED = new Node();
        /** 独占 */
        static final Node EXCLUSIVE = null;
        /**
         * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
         */
        static final int CANCELLED =  1;
        /**
         * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
         */
        static final int SIGNAL    = -1;
        /**
         * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
         */
        static final int CONDITION = -2;
        /**
         * 表示下一次共享式同步状态获取将会无条件地传播下去
         */
        static final int PROPAGATE = -3;
        /** 等待状态 */
        volatile int waitStatus;
        /** 前驱节点 */
        volatile Node prev;
        /** 后继节点 */
        volatile Node next;
        /** 获取同步状态的线程 */
        volatile Thread thread;
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {
        }
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    对于CLH同步队列,一般有如下几种操作:

    1. 节点加入到同步队列

    队列的主要变化是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。

    整个流程图如下:

    Picture1.png

    具体实现可以查看addWaiter(Node node)方法:

        private Node addWaiter(Node mode) {
            //新建Node
            Node node = new Node(Thread.currentThread(), mode);
            //快速尝试添加尾节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                //CAS设置尾节点
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //多次尝试
            enq(node);
            return node;
        }
    

    addWaiter(Node node)先通过快速尝试设置尾节点,如果失败,则调用enq(Node node)方法设置尾节点

        private Node enq(final Node node) {
            //多次尝试,直到成功为止
            for (;;) {
                Node t = tail;
                //tail不存在,设置为首节点
                if (t == null) {
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    //设置为尾节点
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    两个方法都是通过一个CAS方法compareAndSetTail(Node expect, Node update)来设置尾节点,该方法可以确保节点是线程安全添加的。在enq(Node node)方法中,AQS通过“死循环”的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

    2. 首节点移出同步队列

    首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单,head执行该节点并断开原首节点的next和当前节点的prev即可,注意在这个过程是不需要使用CAS来保证的,因为只有一个线程能够成功获取到同步状态。

    流程图如下:

    Picture1.png

    独占式同步状态获取与释放

    AQS提供了acquire(int arg)方法来进行独占式同步状态获取,实现如下:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    其中相关函数的定义为:

    • tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。
    • addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。
    • acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。
    • selfInterrupt:产生一个中断。

    acquireQueued方法为一个自旋的过程,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                //中断标志
                boolean interrupted = false;
                /*
                 * 自旋过程,其实就是一个死循环而已
                 */
                for (;;) {
                    //当前线程的前驱节点
                    final Node p = node.predecessor();
                    //当前线程的前驱节点是头结点,且同步状态成功
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //获取失败,线程等待
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,主要是为了保持FIFO同步队列原则。头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

    在上面的流程中,当获取失败时,需要判断是否阻塞当前线程,

    if (shouldParkAfterFailedAcquire(p, node) &&        parkAndCheckInterrupt())
        interrupted = true;
    

    在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码如下:

       private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //前驱节点
            int ws = pred.waitStatus;
            //状态为signal,表示当前线程处于等待状态,直接放回true
            if (ws == Node.SIGNAL)
                return true;
            //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } 
            //前驱节点状态为Condition、propagate
            else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

    • 如果当前线程的前驱节点状态为SIGNAL,则表明当前线程需要被阻塞,直接返回true,当前线程阻塞

    • 如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false

    • 如果前驱节点非SIGNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SIGNAL,返回false

    如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。

    当线程释放同步状态后,则需要唤醒该线程的后继节点:

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //唤醒后继节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    调用unparkSuccessor(Node node)唤醒后继节点:

        private void unparkSuccessor(Node node) {
            //当前节点状态
            int ws = node.waitStatus;
            //当前状态 < 0 则设置为 0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            //当前节点的后继节点
            Node s = node.next;
            //后继节点为null或者其状态 > 0 (超时或者被中断了)
            if (s == null || s.waitStatus > 0) {
                s = null;
                //从tail节点来找可用节点
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //唤醒后继节点
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。

    以上就是整个独占式获取和释放的过程,流程图如下:

    Picture1.png

    独占式获取响应中断

    AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。

        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    

    首先校验该线程是否已经中断了,如果是则抛出InterruptedException,否则执行tryAcquire(int arg)方法获取同步状态,如果获取成功,则直接返回,否则执行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定义如下:

    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    doAcquireInterruptibly(int arg)方法与acquire(int arg)方法仅有两个差别。

    1.方法声明抛出InterruptedException异常。

    2.在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。

    独占式超时获取

    AQS除了提供上面两个方法外,还提供了一个增强版的方法:tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。

    共享式同步状态获取与释放

    AQS提供acquireShared(int arg)方法共享式获取同步状态:

         public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                //获取失败,自旋获取同步状态
                doAcquireShared(arg);
        }
    

    从上面程序可以看出,方法首先是调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败则调用doAcquireShared(int arg)自旋方式获取同步状态,共享式获取同步状态的标志是返回 >= 0 的值表示获取成功。

        private void doAcquireShared(int arg) {
            /共享式节点
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //前驱节点
                    final Node p = node.predecessor();
                    //如果其前驱节点,获取同步状态
                    if (p == head) {
                        //尝试获取同步
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    tryAcquireShared(int arg)方法尝试获取同步状态,返回值为int,当其 >= 0 时,表示能够获取到同步状态,这个时候就可以从自旋过程中退出。

    默认AQS没有提供tryAcquireShared的实现,需要子类自己实现该方法,

        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    注意到独占式获取锁不同的是,如果tryAcquireShared的返回值大于0,会进行setHeadAndPropagate的操作,下面是该方法的实现,可以看到当某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。doReleaseShared后面会仔细分析。

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    获取同步状态后,完成相应的任务之后,需要调用release(int arg)方法释放同步状态,方法如下:

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    在doReleaseShared中,如果头节点的状态为SIGNAL,则通过CAS将头节点的状态设置为0,并且唤醒后续阻塞的线程;接着再通过CAS设置节点的状态为Node.PROPAGATE。

        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    关于doReleaseShared的几点分析:

    1. 调用该方法的线程可能有很多:在共享锁中,持有共享锁的线程可以有多个,这些线程都可以调用releaseShared方法释放锁;而这些线程想要获得共享锁,则它们必然曾经成为过头节点,或者就是现在的头节点。因此,如果是在releaseShared方法中调用的doReleaseShared,可能此时调用方法的线程已经不是头节点所代表的线程了,头节点可能已经被易主好几次了。
    2. 调用该方法的目的:无论是在acquireShared中调用,还是在releaseShared方法中调用,该方法的目的都是在当前共享锁是可获取的状态时,唤醒head节点的下一个节点。这一点看上去和独占锁似乎一样,但是它们的一个重要的差别是——在共享锁中,当头节点发生变化时,是会回到循环中再立即唤醒head节点的下一个节点的。也就是说,在当前节点完成唤醒后继节点的任务之后将要退出时,如果发现被唤醒后继节点已经成为了新的头节点,则会立即触发唤醒head节点的下一个节点的操作,如此周而复始。
    3. 只有在当前head没有易主时,才会退出,否则继续循环。因为当前可能有多个线程在队列中,比如A -> B -> C -> D, 如果A唤醒B,则B成为新的头节点,接着B会调用doReleaseShared去唤醒C,此时A线程中的head变成了C,因此也加入到了唤醒D的队伍中,此时可能出现A、B、C同时唤醒D的情况,提高了系统效率。当队列中的所有线程都唤醒之后,此时程序退出。

    参考:

    Condition实现

    在之前的例子中,使用Condition和Lock实现了消息的等待和通知,这节介绍Condiition在AQS中的实现。

    JDK的Object对象提供了wait/notify的机制,也能实现消息的等待与通知,Condition与之的差别主要体现在以下几点:

    • 调用wait方法的线程首先必须是已经进入了同步代码块,即已经获取了监视器锁;与之类似,调用await方法的线程首先必须获得lock锁。
    • 调用wait方法的线程会释放已经获得的监视器锁,进入当前监视器锁的等待队列(wait set)中;与之类似,调用await方法的线程会释放已经获得的lock锁,进入到当前Condtion对应的条件队列中。
    • 调用监视器锁的notify方法会唤醒等待在该监视器锁上的线程,这些线程将开始参与锁竞争,并在获得锁后,从wait方法处恢复执行;与之类似,调用Condtion的signal方法会唤醒对应的条件队列中的线程,这些线程将开始参与锁竞争,并在获得锁后,从await方法处开始恢复执行。

    在AQS的Condition实现中,和独占锁的争夺类似的是,每创建一个Condtion对象就会对应一个Condtion队列,每一个调用了Condtion对象的await方法的线程都会被包装成Node扔进一个条件队列中,就像这样:

    Picture2.png

    同样的,在Condition中也会用到之前介绍的同步队列,当等待队列中的节点获得信号通知时,会将等待队列的节点移到同步队列。

    以下是await时节点的变化,

    await.png

    以下是signal信号发出时节点的变化,

    signal.png

    Condition的整个await/signal流程如下:

    1、Condition提供了await()方法将当前线程阻塞,并提供signal()方法支持另外一个线程将已经阻塞的线程唤醒。
    2、Condition需要结合Lock使用
    3、线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程
    4、其他线程调用signal()方法前也必须获取锁,当执行signal()方法时将等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候,唤醒同步队列的首个节点

    5507455-37635d0723174712.png

    下面结合源代码进行分析:

    await实现

    调用await阻塞当前线程

          public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                //将当前线程封装成Node加入到等待队列尾部
                Node node = addConditionWaiter();
                //释放锁
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                //判断当前节点是否已经在同步队列中,如果是则退出循环,如果不是就阻塞当前线程
                //其他线程如果发出了signal信号之后,会把等待队列的线程移入同步队列,此时就会退出循环,进入下面的重新获取锁的acquireQueued
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //其他发出signal信号的线程释放锁之后,该线程被唤醒并重新竞争锁
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
            //线程加入等待队列尾部
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell态的节点
                    unlinkCancelledWaiters();
                    t = lastWaiter;//t指向最后一个状态正确的节点
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)//列表为空,初始化为第一个节点
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    signal/signalAll实现

    将等待队列的节点移入同步队列(signalAll只是循环执行signal而已)

            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;//得到firstWaiter
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
          //将节点从等待队列移入同步队列
          final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;//cas节点状态错误,说明已经cancell了,直接返回false
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);//加入同步队列
            int ws = p.waitStatus;
            //设置前置节点状态为signal,可重入锁那篇文章分析过,为了唤醒线程而设置
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);//特殊情况下唤醒线程并重新同步,一般情况下这里不会执行
            return true;
        }
    

    参考:

    ReentrantReadWriteLock实现

    ReentrantReadWriteLock在内部也是利用了AQS进行锁的竞争与释放,同时也实现了ReadWriteLock接口。

    为了同时保存读锁和写锁的状态,在内部用一个int保存读和写的状态。读状态从高16位读出,写状态从低16位读出,在保证读写锁互斥的前提下,直接利用了AQS现有的数据结构。

            static final int SHARED_SHIFT   = 16;
            //实际是65536
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            //最大值 65535
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            // 同样是65535
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** 获取读的状态  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** 获取写锁的获取状态 */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    

    写锁为独占式的,因此读锁的获取和释放和AQS原生的实现一致。
    读锁是共享式的,获取读锁的状态,并且加1.

     final boolean tryReadLock() {
                Thread current = Thread.currentThread();
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                        return false; //写锁被其他线程获取了,直接返回false
                    int r = sharedCount(c); //获取读锁的状态
                    if (r == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) { //尝试获取读锁
                        if (r == 0) { //说明第一个获取到了读锁
                            firstReader = current; //标记下当前线程是第一个获取的
                            firstReaderHoldCount = 1; //重入次数
                        } else if (firstReader == current) {
                            firstReaderHoldCount++; //次数+1
                        } else {
                            //cachedHoldCounter 为缓存最后一个获取锁的线程
                            HoldCounter rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                cachedHoldCounter = rh = readHolds.get(); //缓存最后一个获取锁的线程
                            else if (rh.count == 0)// 当前线程获取到了锁,但是重入次数为0,那么把当前线程存入进去
                                readHolds.set(rh);
                            rh.count++;
                        }
                        return true;
                    }
                }
            }
    

    读锁的释放:

     protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)//如果是首次获取读锁,那么第一次获取读锁释放后就为空了
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) { //表示全部释放完毕
                        readHolds.remove();  //释放完毕,那么久把保存的记录次数remove掉
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                     // nextc 是 state 高 16 位减 1 后的值
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc)) //CAS设置状态
                        
                        return nextc == 0; //这个判断如果高 16 位减 1 后的值==0,那么就是读状态和写状态都释放了
                }
            }
    

    锁降级

    锁降级算是获取读锁的特例,如在A线程已经获取写锁的情况下,再调取读锁加锁函数则可以直接获取读锁,但此时其他线程仍然无法获取读锁或写锁,在A线程释放写锁后,如果有节点等待则会唤醒后续节点,后续节点可见的状态为目前有A线程获取了读锁。

    AQS实战-实现三元共享锁

    下面的例子里,利用AQS实现了三元共享锁,也就是当前锁只能被三个线程获取。

    public class TripleLock implements Lock  {
    
        //为3表示允许两个线程同时获得锁
        private final Sync sync = new Sync(3);
    
        private static final class Sync extends AbstractQueuedSynchronizer {
    
            Sync(int count) {
                if (count <= 0) {
                    throw new IllegalArgumentException("count must large than zero.");
                }
                setState(count);
            }
    
            public int tryAcquireShared(int reduceCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current - reduceCount;
                    if (newCount < 0 || compareAndSetState(current, newCount)) {
                        return newCount;
                    }
                }
            }
    
            public boolean tryReleaseShared(int returnCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current + returnCount;
                    if (compareAndSetState(current, newCount)) {
                        return true;
                    }
                }
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
        }
    
        @Override
        public void lock() {
            sync.acquireShared(1);
        }
    
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    

    测试程序中,主线程每隔一秒钟打印换行,工作线程直接打印当前的线程名,从结果可以看到,每一个时刻只有三个工作线程在同时运行。

    public class testTripleLock {
        public void test() {
            final Lock lock = new TripleLock();
            
            class Worker extends Thread {
                public void run() {
                        lock.lock();
                        try {
                            System.out.println(Thread.currentThread().getName());
                            SleepTools.second(2);
                        } finally {
                            lock.unlock();
                        }
                        SleepTools.second(2);
                }
            }
            // 启动10个子线程
            for (int i = 0; i < 10; i++) {
                Worker w = new Worker();
                w.setDaemon(true);
                w.start();
            }
            // 主线程每隔1秒换行
            for (int i = 0; i < 10; i++) {
            	SleepTools.second(1);
                System.out.println();
            }
        }
    
        public static void main(String[] args) {
            testTripleLock testMyLock = new testTripleLock();
            testMyLock.test();
        }
    }
    

    本文由『后端精进之路』原创,首发于博客 http://teckee.github.io/ , 转载请注明出处

    搜索『后端精进之路』关注公众号,立刻获取最新文章和价值2000元的BATJ精品面试课程

    后端精进之路.png

  • 相关阅读:
    nyoj 409——郁闷的C小加(三)——————【中缀式化前缀后缀并求值】
    中缀表达式转后缀表达式和前缀表达式
    Zoj 3870——Team Formation——————【技巧,规律】
    BNU4286——Adjacent Bit Counts——————【dp】
    BNU7538——Clickomania——————【区间dp】
    BNU4299——God Save the i-th Queen——————【皇后攻击,找到对应关系压缩空间】
    HDU 2795——Billboard——————【单点更新、求最小位置】
    HDU 4027—— Can you answer these queries?——————【线段树区间开方,区间求和】
    BNU34067——Pair——————【找规律】
    telnet 命令使用方法详解
  • 原文地址:https://www.cnblogs.com/way2backend/p/12036725.html
Copyright © 2011-2022 走看看