zoukankan      html  css  js  c++  java
  • CountDownLatch&&CyclicBarrier

    CountDownLatch

    CountDownLatch是什么

    CountDownLatch是JDK提供的一个同步工具,它可以让一个或多个线程等待,一直等到其他线程中执行完成一组操作。

    常用方法
    • CountDownLatch(int):设置线程数量,即设置计数器的值
    • countDown():计数器减1
    • await():如果计数器大于0时,线程会被阻塞,一直到计数器被countDown()方法减到0时,线程才会继续执行。
    实践案例

    现在小明、小红、小蓝、小芳约好一起到小娜的餐厅吃饭。然后大家都比较熟,所以就决定等到人来齐了的时候在上菜。但是大家也不是无限的等,等一定时间以后,实在等不到就先开始上菜。

    public class Customer implements Runnable{
        private CountDownLatch latch;
        private String name;
        public Customer(CountDownLatch latch, String name) {
            this.latch = latch;
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(name + "出发了");
                int time = new Random().nextInt(1000);
                Thread.sleep(time);
                System.out.println(name + "到达了餐厅");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class Waitress implements Runnable {
        private CountDownLatch latch;
        private String name;
        public Waitress(CountDownLatch latch, String name) {
            this.latch = latch;
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(name + "小姐姐在等待顾客来齐");
                latch.await(2, TimeUnit.SECONDS);//等一段时候后就不等了
                System.out.println("顾客来齐了,开始上菜" + latch.getCount());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public class Main {
        public static void main(String[] args) {
            CountDownLatch latch = new CountDownLatch(3);
            List<Thread> list = new ArrayList<>();
            Customer customer1 = new Customer(latch,"小明");
            Customer customer2 = new Customer(latch,"小红");
            Customer customer3 = new Customer(latch,"小蓝");
            Customer customer4 = new Customer(latch,"小芳");
            list.add(new Thread(customer1));
            list.add(new Thread(customer2));
            list.add(new Thread(customer3));
            list.add(new Thread(customer4));
            Waitress waitress = new Waitress(latch,"小娜");
            new Thread(waitress).start();
            for (Thread t:list) {
                t.start();
            }
        }
    }
    

    运行结果

    实现原理

    CountDownLatch有一个内部类叫Sync,它继承了AbstractQueuedSynchronizer类,其中维护了一个整数state,并保证了修改state的可见性和原子性。

    CountDownLatch(int)

    创建CountDownLatch实例时,也会创建一个Sync的实例,同时把计数器的值传给Sync实例,具体是这样的:

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    private static final class Sync extends AbstractQueuedSynchronizer {//基于AQS实现的
        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
    countDown

    countDown方法中,只调用了Sync实例的ReleaseShared方法,具体是这样的:

    public void countDown() {
    	sync.releaseShared(1);//计数器减1
    }
    

    其中releaseShared方法具体是这样的

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//state减一
            //当state为0时唤醒阻塞的线程
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected boolean tryReleaseShared(int releases) {
        //以自旋的方式将state减1并返回锁是否完全释放
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    await
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    //实际调用的是这个
    public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//将当前线程分装为SHARED的Node
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&//将其上一个合法结点设置为SIGNAL状态
                    parkAndCheckInterrupt())//使当前线程park
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    CyclicBarrier

    CyclicBarrier是什么

    一种同步帮助,它允许一组线程全部互相等待以到达一个公共的障碍点。 CyclicBarriers在涉及固定大小的线程方的程序中很有用,该线程方有时必须互相等待。 该屏障称为循环屏障,因为它可以在释放等待线程之后重新使用。

    常用方法
    • CyclicBarrier(int parties, Runnable barrierAction):创建一个新的CyclicBarrier ,当给定数量的参与者(线程)正在等待它时,它将跳闸;当屏障被跳开时,它将由进入屏障的最后一个线程执行,从而执行给定的屏障动作。
    • int await():等到所有各方都在此障碍上调用await 。
    • int await(long timeout, TimeUnit unit):等到所有各方都在此障碍上调用await 。设置超时限制,超时后不再等待。
    实践案例

    现在小明、小红、小蓝、小芳约好一起去团建。具体流程如下:大家早上先到地点A处集合等大家都到了之后,再各自打车到团建目的地B入口等人齐了之后一起进去,结束之后大家约定一起从C出口等人齐了之后在各自回家。

    public class Person implements Runnable{
        private CyclicBarrier cyclicBarrier;
        private String name;
    
        public Person(CyclicBarrier cyclicBarrier, String name) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
        }
    
        @Override
        public void run() {
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(1000));
                System.out.println(name + "到达第一个集合点A");
                cyclicBarrier.await();
                Thread.sleep(random.nextInt(1000));
                System.out.println(name + "到达第二个集合点B");
                cyclicBarrier.await();
                Thread.sleep(random.nextInt(1000));
                System.out.println(name + "到达第三个集合点C");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class Main {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(4,()->{
                System.out.println("人来齐了可以出发了 GO GO GO ...");
            });
            List<Thread> list = new ArrayList<>();
            Person person1 = new Person(cyclicBarrier,"小明");
            Person person2 = new Person(cyclicBarrier,"小红");
            Person person3 = new Person(cyclicBarrier,"小蓝");
            Person person4 = new Person(cyclicBarrier,"小芳");
            list.add(new Thread(person1));
            list.add(new Thread(person2));
            list.add(new Thread(person3));
            list.add(new Thread(person4));
    
            for (Thread t:list) {
                t.start();
            }
            System.out.println("main end");
        }
    }
    

    运行结果

    实现原理
    成员变量
    /** 同步操作锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 线程拦截器 Condition维护了一个阻塞队列*/
    private final Condition trip = lock.newCondition();
    /** 每次拦截的线程数 */
    private final int parties;
    /* 换代前执行的任务 */
    private final Runnable barrierCommand;
    /** 表示栅栏的当前代 类似代表本局游戏*/
    private Generation generation = new Generation();
    /** 计数器 */
    private int count;
    /** 静态内部类Generation  */
    private static class Generation {
        boolean broken = false;
    }
    
    构造方法
    /** 
    创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,并在触发屏障时执行给定的屏障操作,由最后一个进入屏障的线程执行 */   
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    /** 创建一个新的CyclicBarrier,当给定数量的参与方(线程)在等待它时,它将跳闸,并且在屏障跳闸时不执行预定义的操作 */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    核心方法
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    //定时等待
    public int await(long timeout, TimeUnit unit)throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    

    这个两个方法实质上都是走的dowait方法,只不过传递的参数不同

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//加锁操作
        try {
            final Generation g = generation;
            //检查当前栅栏是否被打翻
            if (g.broken)
                throw new BrokenBarrierException();
            //检查当前线程是否被中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //每次都将计数器的值-1
            int index = --count;
            //计数器的值减为0,则需要唤醒所有线程并转换到下一代
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //唤醒所有线程前先执行指定的任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒所有线程并转换到下一代
                    nextGeneration();
                    return 0;
                } finally {
                    //确保在任务未成功执行时能将所有线程唤醒
                    if (!ranAction)
                        breakBarrier();
                }
            }
            //如果计数器不为0 则执行此循环
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    //根据传入的参数来觉得是定时等待还是非定时等待
                    if (!timed)
                        //如果没有时间限制,则直接等待,直到被唤醒
                        trip.await();
                    else if (nanos > 0L)
                        //如果有时间限制,则等待指定时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //若当前线程在等待期间被中断则打翻栅栏唤醒其它线程
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // 若在捕获中断异常前已经完成在栅栏上的等待,则直接调用中断操作
                        Thread.currentThread().interrupt();
                    }
                }
                //如果线程因为打翻栅栏操作而被唤醒则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //如果线程因为换代操作而被唤醒则返回计数器的值
                if (g != generation)
                    return index;
                //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//最终解锁
        }
    }
    //当计数器的值减为0,则需要唤醒所有线程并转换到下一代
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }
    //若当前线程在等待期间被中断则打翻栅栏唤醒其它线程
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    

    CyclicBarrier 与 CountDownLatch 的区别

    相同点:

    • 都可以实现一组线程达到某个条件之前等待
    • 内部都有一个计数器,当计数器的值减为 0 时所有阻塞的线程都会被唤醒
    • 都是基于AQS实现的

    不同点:

    • CyclicBarrier的计数器是由自己控制的,CountDownLatch 的计数器由调用者控制
    • 在CyclicBarrier中调用await不仅会使自己阻塞,还会是计数器减一。CountDownLatch调用await只会使自己阻塞,调用countDown使才会使计数器减一。
    • CyclicBarrier可以循环使用,CountDownLatch只能使用一次

    参考链接:https://mp.weixin.qq.com/s/jfCLg7OjTsZ5sIJhGXAM2A

  • 相关阅读:
    HTML5 Application Cache
    一个页面多个bootstrip轮播以及一个页面多个swiper轮播 冲突问题
    jquery中attr和prop的区别
    eval函数的工作原理
    JSON.parse 函数
    JS知识体系
    闭包
    io输入输出与反射机制2
    IO输入输出与反射机制1
    项目-超市会员管理系统
  • 原文地址:https://www.cnblogs.com/shaoyu/p/14773680.html
Copyright © 2011-2022 走看看