zoukankan      html  css  js  c++  java
  • Java并发(6)- CountDownLatch、Semaphore与AQS

    引言

    上一篇文章中详细分析了基于AQS的ReentrantLock原理,ReentrantLock通过AQS中的state变量0和1之间的转换代表了独占锁。那么可以思考一下,当state变量大于1时代表了什么?J.U.C中是否有基于AQS的这种实现呢?如果有,那他们都是怎么实现的呢?这些疑问通过详细分析J.U.C中的Semaphore与CountDownLatch类后,将会得到解答。

    1. Semaphore与CountDownLatch的共享逻辑
    2. Semaphore与CountDownLatch的使用示例
    • 2.1 Semaphore的使用
    • 2.2 CountDownLatch的使用
    1. 源码分析
    • 3.1 AQS中共享锁的实现
    • 3.2 Semaphore源码分析
    • 3.3 CountDownLatch源码分析
    1. 总结

    1. Semaphore与CountDownLatch的共享方式

    独占锁意味着只能有一个线程获取锁,其他的线程在锁被占用的情况下都必须等待锁释放后才能进行下一步操作。由此类推,共享锁是否意味着可以由多个线程同时使用这个锁,不需要等待呢?如果是这样,那锁的意义也就不存在了。在J.U.C中共享意味着有多个线程可以同时获取锁,但这个多个是有限制的,并不是无限个,J.U.C中通过Semaphore与CountDownLatch来分别实现了两种有限共享锁。

    Semaphore又叫信号量,他通过一个共享的’信号包‘来给每个使用他的线程来分配信号,当信号包中的信号足够时,线程可以获取锁,反之,信号包中信号不够了,则不能获取到锁,需要等待足够的信号被释放,才能获取。

    CountDownLatch又叫计数器,他通过一个共享的计数总量来控制线程锁的获取,当计数器总量大于0时,线程将被阻塞,不能够获取锁,只有当计数器总量为0时,所有被阻塞的线程同时被释放。

    可以看到Semaphore与CountDownLatch都有一个共享总量,这个共享总量就是通过state来实现的。

    2. Semaphore与CountDownLatch的使用示例

    在详细分析Semaphore与CountDownLatch的原理之前,先来看看他们是怎么使用的,这样方便后续我们理解他们的原理。先知道他是什么?然后再问为什么?下面通过两个示例来详细说明Semaphore与CountDownLatch的使用。

    2.1 Semaphore的使用

    //初始化10个信号量在信号包中,让ABCD4个线程分别去获取
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(10);
    	SemaphoreTest(semaphore);
    }
    
    private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException {
        //线程A初始获取了4个信号量,然后分3次释放了这4个信号量
        Thread threadA = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    semaphore.acquire(4);
                    System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                    semaphore.release(1);
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " release 1 semaphore");
                    semaphore.release(1);
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                    semaphore.release(2);
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadA.setName("threadA");
    
        //线程B初始获取了5个信号量,然后分2次释放了这5个信号量
        Thread threadB = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    semaphore.acquire(5);
                    System.out.println(Thread.currentThread().getName() + " get 5 semaphore");
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " release 2 semaphore");
                    semaphore.release(2);
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " release 3 semaphore");
                    semaphore.release(3);
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadB.setName("threadB");
    
        //线程C初始获取了4个信号量,然后分1次释放了这4个信号量
        Thread threadC = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    semaphore.acquire(4);
                    System.out.println(Thread.currentThread().getName() + " get 4 semaphore");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " release 4 semaphore");
                    semaphore.release(4);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadC.setName("threadC");
        
        //线程D初始获取了10个信号量,然后分1次释放了这10个信号量
        Thread threadD = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    semaphore.acquire(10);
                    System.out.println(Thread.currentThread().getName() + " get 10 semaphore");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " release 10 semaphore");
                    semaphore.release(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadD.setName("threadD");
        
        //线程A和线程B首先分别获取了4个和5个信号量,总信号量变为了1个
        threadA.start();
        threadB.start();
        Thread.sleep(1);
        //线程C尝试获取4个发现不够则等待
        threadC.start();
        Thread.sleep(1);
        //线程D尝试获取10个发现不够则等待
        threadD.start();
    }
    

    执行结果如下:

    threadB get 5 semaphore
    threadA get 4 semaphore
    threadA release 1 semaphore
    threadB release 2 semaphore
    threadC get 4 semaphore
    threadA release 1 semaphore
    threadC release 4 semaphore
    threadB release 3 semaphore
    threadA release 2 semaphore
    threadD get 10 semaphore
    threadD release 10 semaphore
    

    可以看到threadA和threadB在获取了9个信号量之后threadC和threadD之后等待信号量足够时才能继续往下执行。而threadA和threadB在信号量足够时是可以同时执行的。

    其中有一个问题,当threadD排队在threadC之前时,信号量如果被释放了4个,threadC会先于threadD执行吗?还是需要排队等待呢?这个疑问在详细分析了Semaphore的源码之后再来给大家答案。

    2.2 CountDownLatch的使用

    //初始化计数器总量为2
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatchTest(countDownLatch);
    }
    
    private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException {
        //threadA尝试执行,计数器为2被阻塞
        Thread threadA = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + " await");
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadA.setName("threadA");
    
        //threadB尝试执行,计数器为2被阻塞
        Thread threadB = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + " await");
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadB.setName("threadB");
        
        //threadC在1秒后将计数器数量减1
        Thread threadC = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    countDownLatch.countDown();
                    
                    System.out.println(Thread.currentThread().getName() + " countDown");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadC.setName("threadC");
        
        //threadD在5秒后将计数器数量减1
        Thread threadD = new Thread(new Runnable() {
    
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    countDownLatch.countDown();
                    
                    System.out.println(Thread.currentThread().getName() + " countDown");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        });
        threadD.setName("threadD");
        
        threadA.start();
        threadB.start();
        threadC.start();
        threadD.start();
    }
    

    执行结果如下:

    threadC countDown
    threadD countDown
    threadA await
    threadB await
    

    threadA和threadB在尝试执行时由于计数器总量为2被阻塞,当threadC和threadD将计数器总量减为0后,threadA和threadB同时开始执行。

    总结一下:Semaphore就像旋转寿司店,共有10个座位,当座位有空余时,等待的人就可以坐上去。如果有只有2个空位,来的是一家3口,那就只有等待。如果来的是一对情侣,就可以直接坐上去吃。当然如果同时空出5个空位,那一家3口和一对情侣可以同时上去吃。CountDownLatch就像大型商场里面的临时游乐场,每一场游乐的时间过后等待的人同时进场玩,而一场中间会有不爱玩了的人随时出来,但不能进入,一旦所有进入的人都出来了,新一批人就可以同时进场。

    3. 源码分析

    明白了Semaphore与CountDownLatch是做什么的,怎么使用的。接下来就来看看Semaphore与CountDownLatch底层时怎么实现这些功能的。

    3.1 AQS中共享锁的实现

    上篇文章通过对ReentrantLock的分析,得倒了AQS中实现独占锁的几个关键方法:

    //状态量,独占锁在0和1之间切换
    private volatile int state;
    
    //调用tryAcquire获取锁,获取失败后加入队列中挂起等操作,AQS中实现
    public final void acquire(int arg);
    
    //独占模式下尝试获取锁,ReentrantLock中实现
    protected boolean tryAcquire(int arg);
    
    //调用tryRelease释放锁以及恢复线程等操作,AQS中实现
    public final boolean release(int arg);
    
    //独占模式下尝试释放锁,ReentrantLock中实现
    protected boolean tryRelease(int arg);
    

    其中具体的获取和释放独占锁的逻辑都放在ReentrantLock中自己实现,AQS中负责管理获取或释放独占锁成功失败后需要具体处理的逻辑。那么共享锁的实现是否也是遵循这个规律呢?由此我们在AQS中发现了以下几个类似的方法:

    //调用tryAcquireShared获取锁,获取失败后加入队列中挂起等操作,AQS中实现
    public final void acquireShared(int arg);
    
    //共享模式下尝试获取锁
    protected int tryAcquireShared(int arg);
    
    //调用tryReleaseShared释放锁以及恢复线程等操作,AQS中实现
    public final boolean releaseShared(int arg);
    
    //共享模式下尝试释放锁
    protected boolean tryReleaseShared(int arg);
    

    共享锁和核心就在上面4个关键方法中,先来看看Semaphore是怎么调用上述方法来实现共享锁的。

    3.2 Semaphore源码分析

    首先是Semaphore的构造方法,同ReentrantLock一样,他有两个构造方法,这样也是为了实现公平共享锁和非公平共享锁,大家可能有疑问,既然是共享锁,为什么还分公平和非公平的呢?这就回到了上面那个例子后面的疑问,前面有等待的线程时,后来的线程是否可以直接获取信号量,还是一定要排队。等待当然是公平的,插队就是非公平的。

    还是用旋转寿司的例子来说:现在只有2个空位,已经有一家3口在等待,这时来了一对情侣,公平共享锁的实现就是这对情侣必须等待,只到一家3口上桌之后才轮到他们,而非公平共享锁的实现是可以让这对情况直接去吃,因为刚好有2个空位,让一家3口继续等待(好像是很不公平......),这种情况下非公平共享锁的好处就是可以最大化寿司店的利润(好像同时也得罪了等待的顾客......),也是Semaphore默认的实现方式。

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    

    Semaphore的例子中使用了两个核心方法acquire和release,分别调用了AQS中的acquireSharedInterruptibly和releaseShared方法:

    //获取permits个信号量
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }
    
    //释放permits个信号量
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量
            doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { //尝试释放arg个信号量
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    Semaphore在获取和释放信号量的流程都是通过AQS来实现,具体怎么算获取成功或释放成功则由Semaphore本身实现。

    //公平共享锁尝试获取acquires个信号量
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors()) //前面是否有排队,有则返回获取失败
                return -1;
            int available = getState(); //剩余的信号量(旋转寿司店剩余的座位)
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining)) // 剩余信号量不够,够的情况下尝试获取(旋转寿司店座位不够,或者同时来两对情况抢座位)
                return remaining;
        }
    }
    //非公平共享锁尝试获取acquires个信号量
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState(); //剩余的信号量(旋转寿司店剩余的座位)
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining)) // 剩余信号量不够,够的情况下尝试获取(旋转寿司店座位不够,或者同时来两对情侣抢座位)
                return remaining;
        }
    }
    

    可以看到公平共享锁和非公平共享锁的区别就在是否需要判断队列中是否有已经等待的线程。公平共享锁需要先判断,非公平共享锁直接插队,尽管前面已经有线程在等待。

    为了验证这个结论,稍微修改下上面的示例:

    threadA.start();
    threadB.start();
    Thread.sleep(1);
    threadD.start(); //threadD已经在排队
    Thread.sleep(3500);
    threadC.start(); //3500毫秒后threadC来插队
    

    结果输出:

    threadB get 5 semaphore
    threadA get 4 semaphore
    threadB release 2 semaphore
    threadA release 1 semaphore
    threadC get 4 semaphore //threadC先与threadD获取到信号量
    threadA release 1 semaphore
    threadB release 3 semaphore
    threadC release 4 semaphore
    threadA release 2 semaphore
    threadD get 10 semaphore
    threadD release 10 semaphore
    

    这个示例很好的说明了当为非公平锁时会先尝试获取共享锁,然后才排队。

    当获取信号量失败之后会去排队,排队这个操作通过AQS中的doAcquireSharedInterruptibly方法来实现:

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //加入等待队列
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor(); //获取当前节点的前置节点
                if (p == head) {
                    int r = tryAcquireShared(arg); //前置节点是头节点时,说明当前节点是第一个挂起的线程节点,再次尝试获取共享锁
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); //与ReentrantLock不同的地方:获取共享锁成功设置头节点,同时通知下一个节点
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否需要挂起线程
                    parkAndCheckInterrupt()) //挂起线程,当前线程阻塞在这里!
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    这一段代码和ReentrantLock中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法基本一样,说下两个不同的地方。一是加入等待队列时这里加入的是Node.SHARED类型的节点。二是获取锁成功后会通知下一个节点,也就是唤醒下一个线程。以旋转寿司店的例子为例,前面同时走了5个客人,空余5个座位,一家3口坐进去之后会告诉后面的一对情侣,让他们也坐进去,这样就达到了共享的目的。shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法在上一篇文章中都有详细说明,这里就做解释了。

    再来看看releaseShared方法时怎么释放信号量的,首先调用tryReleaseShared来尝试释放信号量,释放成功后调用doReleaseShared来判断是否需要唤醒后继线程:

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow //释放信号量过多
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next)) //cas操作设置新的信号量
                return true;
        }
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { //SIGNAL状态下唤醒后继节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); //唤醒后继节点
                }
                else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    释放的逻辑很好理解,相比ReentrantLock只是在state的数量上有点差别。

    3.3 CountDownLatch源码分析

    CountDownLatch相比Semaphore在实现逻辑上要简单的多,同时他也没有公平和非公平的区别,因为当计数器达到0的时候,所有等待的线程都会释放,不为0的时候,所有等待的线程都会阻塞。直接来看看CountDownLatch的两个核心方法await和countDown。

    public void await() throws InterruptedException {
        //和Semaphore的不同在于参数为1,其实这个参数对CountDownLatch来说没什么意义,因为后面CountDownLatch的tryAcquireShared实现是通过getState() == 0来判断的
        sync.acquireSharedInterruptibly(1); 
    }
    
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        //这里加入了一个等待超时控制,超过时间后直接返回false执行后面的代码,不会长时间阻塞
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); 
    }
    
    public void countDown() {
        sync.releaseShared(1); //每次释放1个计数
    }
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量
            doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起
    }
    
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1; //奠定了同时获取锁的基础,无论State初始为多少,只能计数等于0时触发
    }
    

    和Semaphore区别有两个,一是State每次只减少1,同时只有为0时才释放所有等待线程。二是提供了一个超时等待方法。acquireSharedInterruptibly方法跟Semaphore一样,就不细说了,这里重点说下tryAcquireSharedNanos方法。

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }
    
    //最小自旋时间
    static final long spinForTimeoutThreshold = 1000L;
    
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout; //计算了一个deadline
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime(); 
                if (nanosTimeout <= 0L) //超时后直接返回false,继续执行
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold) //大于最小cas操作时间则挂起线程
                    LockSupport.parkNanos(this, nanosTimeout); //挂起线程也有超时限制
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    重点看标了注释的几行代码,首先计算了一个超时时间,当超时后直接退出等待,继续执行。如果未超时并且大于最小的cas操作时间,这里定义的是1000ns,则挂起,同时挂起操作也有超时限制。这样就实现了超时等待。

    4.总结

    至此关于AQS的共享锁的两个实现Semaphore与CountDownLatch就分析完了,他们与非共享最大的区别就在于是否能多个线程同时获取锁。看完后希望大家能对Semaphore与CountDownLatch有深刻的理解,不明白的时候想想旋转寿司店和游乐场的例子,如果对大家有帮助,觉得写的好的话,可以点个赞,当然更希望大家能积极指出文中的错误和提出积极的改进意见。

  • 相关阅读:
    Idea安装Scala插件(转)
    serialVersionUID的作用(转)
    [转]学习win10的bash使用ssh连接远程服务器
    [转]使用 Travis CI 部署你的 Hexo 博客
    【转】H5
    【转】Virtual DOM
    【转】hexo博客图片问题
    【转】V8 之旅: 垃圾回收器
    关于react-redux中Provider、connect的解析
    【转】webpack4
  • 原文地址:https://www.cnblogs.com/konck/p/9473591.html
Copyright © 2011-2022 走看看