zoukankan      html  css  js  c++  java
  • java中的JUC组件(Semaphore、CountDownLatch、CyclicBarrier)

    1、简介

    Semaphore、CountDownLatch、CyclicBarrier 这三个工具类都是用于并发控制的操作,底层都是基于AQS去实现的;

    • Semaphore(信号量): 提供一个竞争资源处理的工具,当系统内有足够的信号量事,线程可以去获取信号量执行操作,当信号量资源被使用完后,需要等待资源释放后后续线程(资源竞争)才能够执行;
    • CountDownLatch(闭锁):可以理解为一个开关,闭锁创建的时候,可以指定一个闭锁打开依赖条件个数,只有满足条件的资源就绪时,闭锁打开,对应的线程可以执行后续任务;
    • CyclicBarrier(栅栏):当所有依赖栅栏的线程执行到关键节点的时候,都需要等待其他线程到达栅栏点,否则无法继续执行,所有线程到达栅栏的关键点,这些线程即可继续执行后续任务。

    2、Semaphore

    semaphore在代码中使用的执行流程简图:

    SemaPhore_Flow.png

    其本质就是操作系统的P-V操作,当资源足够的时候线程获得资源并执行,资源不足时线程等待或者退出,当资源被释放时线程又可以获取竞争资源继续执行;

    2.1、源码实现:

    Semaphore_structure.png

    Semaphore类的代码结构如上图,可以看到其内部实现了公平锁和非公平锁,所以我们可以使用这两种不同的模式来构建Semaphore对象实例;

    不论我们使用公平与非公平锁,其初始化最终都会调用到sync的构造方法;
    下面可以看看内部的获取资源以及释放资源的具体实现:

    
    public class Semaphore implements java.io.Serializable {
        abstract static class Sync extends AbstractQueuedSynchronizer{
            //初始化
            Sync(int permits) {
                setState(permits);
            }
        }
        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
        public void release() {
            sync.releaseShared(1);
        }
    }
    
    

    sync.acquireSharedInterruptibly(permits) 最终会调用的AQS的实现,对于AQS中该实现简单说明:如果这里能够获取到资源,内部就返回success,这里会扣减permits的值,任务继续执行,否则,将当前线程加入等待队列中(具体参考AbstractQueuedSynchronizer.acquireSharedInterruptibly());

    3、CountDownLatch

    CountDownLatch在代码中使用的执行流程简图:

    CountDownLatch_Flow.png

    3.1、源码实现

    CountDownLatch_structure.png

    public class CountDownLatch {
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
        }
        
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
        
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        
        public void countDown() {
            sync.releaseShared(1);
        }
    }
    

    当某个线程调用await()方法时由于CountDownLatch.Sync中的实现tryAcquireShared会判断state是否等于0,如果不等于,就会进入等待队列,直到countDown调用sync.releaseShared(1)使得sync的状态到0,await的线程才会继续执行;

    4、CyclicBarrier

    CyclicBarrier在代码中使用的执行流程简图:

    CyclicBarrier_Flow.png

    4.1、源码实现

    CyclicBarrier_structure.png

    public class CyclicBarrier {
    
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        private static class Generation {
            boolean broken = false;
        }
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //保存当前generation 副本
                final Generation g = generation;
                //如果当前副本已推出,抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //如果当前线程已中断抛出异常
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    
                //计算当前是否所有线程到达“栅栏”
                int index = --count;
                if (index == 0) {  // tripped
                    //所有线程到达栅栏
                    boolean ranAction = false;
                    try {
                        //如果初始化了自定方法,触发
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        //唤醒所有到达“栅栏”的线程,并跳到下一个新的generation
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                //如果还有线程没有到达栅栏
                for (;;) {
                    try {
                        //根据设置的等待时间进行等待
                        //里面会调用LockSupport.park(this);
                        //将当前线程放入等待队列中
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        //异常处理流程
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
                    
                    if (g.broken)
                        throw new BrokenBarrierException();
                    //当前线程geration 不等于实际 geration 正常返回
                    if (g != generation)
                        return index;
                    //超过等待时间所有线程还未到达“栅栏”,抛出异常
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
        //跳到下一个geration操作
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    }
    

    CyclicBarrier 维护了一个计数器,和一个 generation 每次调用await都会有将计数器减一,并且产生一个新的 generation ,只要计数器不为零,所有前置线程都会触发 ((Condition)trip).await(); 内部会调用 LockSupport.park(this); 方法将线程加入等待队列,知道所有线程就绪,会调用 trip.signalAll(); 唤醒所有线程,同时执行一个用户自定义的 Runnable 策略

  • 相关阅读:
    查看端口被占用
    Eclipse导入包
    Eclipse中构造方法自动生成
    Eclipse中get/set方法自动生成
    Eclipse改字体大小
    设计六原则
    类的关系
    JAVA实现多线程下载
    try...catch的前世今生
    447. 回旋镖的数量
  • 原文地址:https://www.cnblogs.com/wykCN/p/12153785.html
Copyright © 2011-2022 走看看