zoukankan      html  css  js  c++  java
  • CyclicBarrier 的使用与源码解析

    使用

    CyclicBarrier 类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作。

    CyclicBarrier 也可以实现类似 CountDownLatch 的功能,而且比 CountDownLatch 更强大,因为 CyclicBarrier 可以重复被使用。

    代码示例:

    @Test
    public void test() throws InterruptedException {
        int parties = 3;
    
        // 定义一个线程池
        // CyclicBarrier 中线程执行完成会阻塞等待其它线程到达屏障点,所以可用线程至少需要 parties 个(这里有 6 个线程要执行)
        ExecutorService executor = Executors.newFixedThreadPool(2 * parties);
        CountDownLatch countDownLatch = new CountDownLatch(2 * parties);
    
        CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> {
            System.out.println("任务都到达了屏障点,主线程继续执行任务");
        });
    
        IntStream.range(0, parties).forEach(i -> {
            executor.submit(() -> {
                System.out.println("条件" + (i + 1) + "正在执行");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("条件" + (i + 1) + "执行完成,到达屏障点");
    
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        });
    
        // parties 个线程到达屏障点后,接着执行构造方法传递的 barrierAction 任务
        // 然后会调用 nextGeneration() 方法,唤醒所有到达屏障点的线程,再重置 count 的值为 parties
        // 所以可以接着执行下面的条件4 ~ 条件6的线程
        // 但是没有控制线程执行的顺序,所以并不是条件1~条件3这三个线程先执行,条件4 ~ 条件6这三个线程后执行的
    
        IntStream.range(parties, parties + parties).forEach(i -> {
            executor.submit(() -> {
                System.out.println("条件" + (i + 1) + "正在执行");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("条件" + (i + 1) + "执行完成,到达屏障点");
    
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        });
    
        countDownLatch.await(20, TimeUnit.SECONDS);
        executor.shutdown();
    
        /**
             * 可能的执行结果:
             *
             * 条件1正在执行
             * 条件2正在执行
             * 条件3正在执行
             * 条件4正在执行
             * 条件6正在执行
             * 条件5正在执行
             * 条件6执行完成,到达屏障点
             * 条件5执行完成,到达屏障点
             * 条件3执行完成,到达屏障点
             * 条件4执行完成,到达屏障点
             * 任务都到达了屏障点,主线程继续执行任务
             * 条件1执行完成,到达屏障点
             * 条件2执行完成,到达屏障点
             * 任务都到达了屏障点,主线程继续执行任务
             */
    }
    

    reset() 方法代码示例:

    @Test
    public void test2() throws InterruptedException {
        int parties = 3;
    
        // 定义一个线程池
        // CyclicBarrier 中线程执行完成会阻塞等待其它线程到达屏障点,所以可用线程至少需要 parties 个(这里有 6 个线程要执行)
        ExecutorService executor = Executors.newFixedThreadPool(2 * parties);
        CountDownLatch countDownLatch = new CountDownLatch(2 * parties);
    
        CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> {
            System.out.println("任务都到达了屏障点,主线程继续执行任务");
        });
    
        IntStream.range(0, parties).forEach(i -> {
            executor.submit(() -> {
                System.out.println("条件" + (i + 1) + "正在执行");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("条件" + (i + 1) + "执行完成,到达屏障点");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        });
    
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
        // 调用 reset 方法会将当前代终止,如果主线程调用 reset 方法时,有线程阻塞在 dowait 方法中
        // 那么已经被阻塞的线程被唤醒时,会抛出 BrokenBarrierException 异常,未阻塞的线程不会跑出异常
    
        // 测试 reset() 方法
        System.out.println("执行 reset 方法");
        cyclicBarrier.reset();
    
        IntStream.range(parties, parties + parties).forEach(i -> {
            executor.submit(() -> {
                System.out.println("条件" + (i + 1) + "正在执行");
                try {
                    Thread.sleep(i * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("条件" + (i + 1) + "执行完成,到达屏障点");
    
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        });
    
        countDownLatch.await(20, TimeUnit.SECONDS);
        executor.shutdown();
    
        /**
             * 可能的执行结果 1:
             *
             * 条件1正在执行
             * 条件3正在执行
             * 条件2正在执行
             * 执行 reset 方法
             * 条件3执行完成,到达屏障点
             * 条件1执行完成,到达屏障点
             * java.util.concurrent.BrokenBarrierException
             * 	at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
             * 	at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:362)
             * 	at xxx.xxx.xxx.CyclicBarrierTests.lambda$null$6(CyclicBarrierTests.java:116)
             * 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
             * 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
             * 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
             * 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
             * 	at java.lang.Thread.run(Thread.java:748)
             * 条件5正在执行
             * 条件6正在执行
             * 条件4正在执行
             * 条件2执行完成,到达屏障点
             * 条件4执行完成,到达屏障点
             * 任务都到达了屏障点,主线程继续执行任务
             * 条件5执行完成,到达屏障点
             * 条件6执行完成,到达屏障点
             *
             *
             * 可能的执行结果2:
             *
             * 条件1正在执行
             * 条件3正在执行
             * 条件2正在执行
             * 执行 reset 方法
             * 条件4正在执行
             * 条件5正在执行
             * 条件2执行完成,到达屏障点
             * 条件3执行完成,到达屏障点
             * 条件1执行完成,到达屏障点
             * 任务都到达了屏障点,主线程继续执行任务
             * 条件6正在执行
             * 条件4执行完成,到达屏障点
             * 条件5执行完成,到达屏障点
             * 条件6执行完成,到达屏障点
             * 任务都到达了屏障点,主线程继续执行任务
             */
    }
    

    源码

    属性

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    
    /** The number of parties */
    // 任务线程数(这些线程全部执行完成后(即到达屏障点),才能继续执行主线程任务)
    private final int parties;
    
    /* The command to run when tripped */
    // 每当 parties 个任务线程都完成后(即 parties 个线程都到达了屏障点),会执行一次 barrierCommand 任务
    // 构造函数如果不传 barrierCommand,则不执行
    private final Runnable barrierCommand;
    
    /** The current generation */
    // 当前代
    private Generation generation = new Generation();
    
    /**
         * Number of parties still waiting. Counts down from parties to 0
         * on each generation.  It is reset to parties on each new
         * generation or when broken.
         */
    // parties 线程中,当前正在执行的任务线程数(即当前还有多少线程未到达屏障点)
    // 每执行完一个任务后,count 减一
    private int count;
    

    Generation

    private static class Generation {
        boolean broken = false;
    }
    
    // 创建下一代
    private void nextGeneration() {
        // 唤醒上一代所有阻塞在 trip 条件的线程
        trip.signalAll();
        // 重置 count 为 parties
        count = parties;
        // 重置当前代
        generation = new Generation();
    }
    

    构造函数

    // parties 表示有 parties 个线程任务全部执行完后(即到达屏障点),主线程才能继续执行
    // barrierAction 表示所有 parties 线程都到达屏障点后,需要执行的一个任务
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    

    breakBarrier

    // 终止屏障当前代
    private void breakBarrier() {
        // 设置当前代的终止标记为 true
        generation.broken = true;
        // 重置 count
        count = parties;
        // 唤醒所有阻塞在 trip 条件的线程
        trip.signalAll();
    }
    

    await

    // 阻塞当前线程(不带超时时间)
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    // 阻塞当前线程(带超时时间)
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
    BrokenBarrierException,
    TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
    // 真正阻塞线程的逻辑
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        final ReentrantLock lock = this.lock;
        // 先获取锁
        lock.lock();
        try {
            // 获取当前代
            final Generation g = generation;
    
            // 当前代已被终止
            if (g.broken)
                // 则抛出 BrokenBarrierException 异常
                throw new BrokenBarrierException();
    
            // 线程被中断
            if (Thread.interrupted()) {
                // 则终止屏障
                breakBarrier();
                // 并抛出 InterruptedException 异常
                throw new InterruptedException();
            }
    
            // count 自减 1
            int index = --count;
            // index 等于 0 表示 parties 个线程都已完成了(都到达了屏障点)
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    // 获取构造方法传的 barrierCommand
                    final Runnable command = barrierCommand;
                    // 如果 barrierCommand 不等于 null
                    if (command != null)
                        // 则执行 barrierCommand 的 run() 方法(最后一个到达屏障点的线程执行的此方法)
                        command.run();
                    ranAction = true;
                    // 开始下一代
                    nextGeneration();
                    return 0;
                } finally {
                    // 如果执行 barrierCommand 出现异常
                    if (!ranAction)
                        // 则也终止屏障
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            // 自旋等待,除非所有线程都到达了屏障点、屏障被终止、线程被中断或者等待超时
            for (;;) {
                try {
                    // 没有设置超时时间
                    if (!timed)
                        // 则阻塞等待
                        trip.await();
                    else if (nanos > 0L)
                        // 设置了超时时间,超时等待
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                // 判断当前代是否已被终止
                // 如果调用了 breakBarrier() 方法或 reset() 方法(reset 方法里也是调用了 breakBarrier 方法)会唤醒当前阻塞的线程,然后走到这一步,此时 g.broken 是 true
                if (g.broken)
                    // 抛出 BrokenBarrierException 异常
                    throw new BrokenBarrierException();
    
                // g != generation,表示已经创建了下一代
                // 到达屏障点,会唤醒所有阻塞的线程,并调用 nextGeneration() 方法,然后走到这一步,此时 g != generation
                if (g != generation)
                    return index;
    
                // 等待已超时
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    

    代码中有好几处都抛出了异常,总结一下:

    • CyclicBarrier 被终止了,在进入 dowait 方法前和后都会判断是否终止,终止会抛出 BrokenBarrierException 异常。终止可以通过调用 breakBarrier 方法、 reset 方法(底层也是调用了 breakBarrier 方法 )或者 dowait 方法中会线程等待超时调用 breakBarrier 方法。
    • 线程被中断。在进入 dowait 方法会判断线程是否被中断,中断会抛出 BrokenBarrierException 异常。
    • 线程都到达屏障点时,会执行 barrierCommand 任务,如果 barrierCommand 抛出了异常,会捕获异常并调用 breakBarrier 方法。

    isBroken

    // 判断屏障当前代是否已被终止
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
    

    reset

    public void reset() {
        final ReentrantLock lock = this.lock;
        // 先获取锁
        lock.lock();
        try {
            // 终止屏障当前代
            breakBarrier();   // break the current generation
            // 开始下一代
            nextGeneration(); // start a new generation
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
    
  • 相关阅读:
    第二次安卓作业
    第十一次作业
    第十一次上机练习
    第十次作业
    第十次上机练习
    第九次作业
    第九次上机练习
    添加用户 Android 6
    Android 5
    activity带数据跳转
  • 原文地址:https://www.cnblogs.com/wu726/p/15646176.html
Copyright © 2011-2022 走看看